Skip links

MLOps: CI/CD for Machine Learning Pipelines & Model Deployment with Kubeflow

MLOps (Machine Learning Operations) is a practice for collaboration between data scientists, software engineers and operations to automate the deployment and governance of machine learning services. A critical aspect of MLOps is the deployment of Machine Learning Pipelines and Models using automated tools. This article describes how this can be achieved using Kubeflow Pipelines, GitLab CI and an open source Python library called HyperModel.

While many organisations have begun their machine learning journey with feasibility studies and proof of concepts (PoC’s), there is a real challenge in taking them to production. These challenges include the dealing with the fact that ML Models represent both code and data, that the failure modes of Machine Learning are still poorly understood and that governance structures are still being defined.

This series of articles shows you how to implement a deployment process for machine learning pipelines & model deployment with Kubeflow, using standard patterns borrowed from software engineering, DevOps and Site Reliability Engineering (SRE).

This series is made up of three key themes related to building infrastructure for Production Machine Learning.

  1. Deployment of Machine Learning Pipelines and Models via CI/CD
  2. Automated testing for Machine Learning
  3. Monitoring, Auditing and Logging for Machine Learning

Introduction to Machine Learning Workflows

Machine Learning systems generally have two key phases: Training & Inference (Prediction).

The Training Phase seeks to manipulate source data into features which are used to train a model and to evaluate the trained model.

The Inference Phase generally loads the pre-trained model and uses it to serve up predictions. In many cases the Inference Phase will serve the model via a REST API, or via a service for batch predictions.

Machine Learning Training Phase

The majority of the complexity in machine learning projects is in the training phase. In order to mitigate this complexity the training phase of ML projects are are often engineered as a Pipeline if dependent steps (known as a Directed Acyclic Graph). Typically, an Machine Learning Pipeline will contain operations that:

  • Prepare & integrate data
  • Feature engineering
  • Model training / fitting
  • Hyper Parameter Tuning
  • Model Evaluation

The development of Machine Learning Pipelines is highly complex, and many house-keeping activities can be abstracted away by leveraging existing workflow engines such as Kubeflow Pipelines, Airflow (Airbnb) or Luigi (Spotify). The remainder of this article will focus on Kubeflow Pipelines, because we believe that is its approach better captures the requirements we see from our clients.

Moreover Kubeflow Pipelines are designed to be comprised of re-usable Operations, whereby multiple Pipelines can use the same Operation with different parameters.

Machine Learning Inference Phase

The input data provided to the model for predictions must be the same shape as the data used to train the model. However the data available at inference time also often requires significant pre-processing (e.g. normalisation, encoding, feature engineering) prior to being ready to run through the model. Ideally the code used to pre-process data in the Training Phase can be re-sued in the Inference Phase.

With the data pre-processed, it is ready to be fed into the model and predictions can be returned to the end user.

About CI/CD

CI/CD stands for “Continuous Integration / Continuous Delivery” and is a key concept of DevOps. At its most basic level, CI/CD Pipelines are just scripts which are triggered whenever code is changed (e.g. as the result of a push or merge), automatically building software assets, running tests and deploying code.

There is an important distinction between CI/CD Pipelines and Training Pipelines. CI/CD Pipelines run in response to code changes, whereas training Training Pipelines are often run on demand or on a schedule unrelated to code changes.

CI/CD Pipelines and Workflows are highly mature in the Software Engineering world, but are under-utilised in the world of Machine Learning and Data Science.

About Merge / Pull Requests & CI/CD

Pull/Merge requests are requests for newly developed code to be integrate it into “trunk” of the code base. A Merge / Pull request is generated by the engineer (or data scientist), requesting that the team accept the new changes to the code base, which will often require the new code pass a series of gates, including:

  • The change relates back to a change request / ticket (e.g. Jira Task)
  • Unit tests pass
  • Automated integration tests pass
  • User acceptance testing
  • Code review

Once the code has passed these gates (many of which will be automated), the code is ready to be merged into the code base for deployment.

We will use the concept of Merge Requests as the gates for human intervention in our deployment process, ensuring that each deployment is signed off by a real person. We have found that manual “sign-off” on deployments is much more palatable for senior management from a governance perspective than purely automated solutions.

CI/CD for Machine Learning using Kubeflow

Within a Machine Learning workflow, there are two kinds of deployments:

  1. Deploying code (Pipeline code, Inference code)
  2. Deploying models (e.g. .joblib / encoding information)

Traditionally, only code is stored within a Git repository, with data being stored in a Database, Data Lake or Data Warehouse. Machine Learning models more closely resemble “data” than “code”, and are not really suitable for storage in a Git repository for the following reasons:

  1. Model files are machine generated, not the work of humans
  2. Models are often very large (potentially in the GB)
  3. Joblib files have a binary format, which means they do not make sense to diff and changes are difficult to reason about

However, it does make sense to store a “link” to where we can find a model, so that we can rollback to previous versions, or to link the current version of code with the current version of a model. This is especially true since changes in code may mean an older model is no longer compatible. The link can be managed via git-lfs, or by a simple json file pointing to where the file exists in shared storage (e.g. S3, Hadoop, Google Cloud Storage or Azure Blob Storage).

Storing a reference to the current version of the model (along with a checksum for verification), means we can also use the same Merge / Pull Request pattern to deploy models as we would to deploy code. This also means that we can have a “Model Review” phase of our deployment workflow, where a human can decide (& take responsibility) for the deployment of model to production

ML Deployment Process via Git

Central to the Git based approach for ML deployment is the concept of a “Two Phased” deployment:

Phase 1 is the deployment of a new Training Pipeline and involves deploying new code (via docker images) which is used to train new models in production – often on a schedule (e.g. daily, weekly).

Phase 2 is the deployment of a new model, which does not involve deploying new source-code, but rather deploying an updated reference to the current version of the model.

Process details

Many of the lines in the above diagram are labelled with a number, which we will describe in more detail below:

  1. The Data Scientist develops source code for the training and inference phases of the model, deployable as a single package. The source code is then pushed to a branch within a repository (which should be associated with a ticket / change request), which triggers a build phase in CI/CD along with automated tests.
  2. The Data Scientist submits a Pull/Merge Request detailing the change and requesting that it be merged into the trunk of the repository. This will trigger m
  3. A “Nominated Approver” (e.g. a Senior Engineer / Data Scientist) will review the change (along with build and automated testing output), comment on it and eventually approve it for merging
  4. Following the successful merge (or when the branch is tagged with a release), the CI/CD platform will then deploy the new training code to Kubeflow, where it will be executed as a Pipeline, probably on a schedule
  5. The final steps of the Pipeline will generate new model artifacts (e.g. a .joblib file, evaluation metrics, information about distributions, etc), copying this data to a Data Lake / Cloud / Blob Storage.
  6. The final step will be to automatically update the model-reference.json file in the repository, commit the change and create a merge request such that the mode-reference.json change is reflected in a release / trunk version of the codebase
  7. A “Nominated Approved” (e.g. a senior Data Scientist) will be tasked with evaluating the model, to ensure that the model meets all the requirements for deployment (e.g. performance, testing, etc)
  8. The approval of the merge triggers another CI/CD Deployment, this time re-deploying the “Inference” application with the updated link. The re-deployed Inference application will point to the latest version of the trained Model.

Show me the code

All the code described in this article is available on GitHub, here.

“Crashed” is a simple Machine Learning task that analyses car accidents in the state of Victoria, Australia to predict whether or not alcohol was a contributing factor. Crash investigators could potentially use this service to identify when to require alcohol testing post accident.

We make extensive use of our own open source (Apache license) python library called Hyper Model, which provides a simplified API over Kubeflow, Click (for CLI integration) and Kubernetes. Documentation for HyperModel is available here.

Defining a package

To improve the development and deployment experience of building a production ready ML App, we want to be able to package both the Inference and Pipeline phases of the project in a single package. The primary advantage of this approach is that it enables us to easily share code between training and inference phases.

To create a package we use the standard Python package pattern, defining a setup.py that looks like this:

NAME = "crashed"
VERSION = "0.0.74"
REQUIRES = [
    "click",
    "kfp",
    "xgboost",
    "pandas",
    "sklearn",
    "google-cloud",
    "hypermodel",
]

setup(
    name=NAME,
    version=VERSION,
    install_requires=REQUIRES,
    packages=find_packages(),
    python_requires=">=3.5.3",
    include_package_data=True,
    entry_points={"console_scripts": ["crashed = crashed.start:main"]},
)

The most important aspect of this is that we define an entry point crashed which will serve as the entrypoint into our application, and once the package is installed, we can simply start our app with crashed from the terminal.

The HyperModel package referenced here will help us out be defining a number of command groups that we can use to run pipeline steps locally, run the whole pipeline and deploy the pipeline to Kubeflow and the InferenceApp to Kubernetes.

Defining a HyperModel App Entry-point

In order to co-ordinate the Pipeline Phase, Inference Phase, Deployment and local run time behaviour, we need to define an HmlApp, which is as simple as:

 def main():
    # Create a reference to our "App" object which maintains state
    # about both the Inference and Pipeline phases of the model
    image_url = os.environ["DOCKERHUB_IMAGE"] + ":" + os.environ["CI_COMMIT_SHA"]

    app = hml.HmlApp(
        name="car-crashes",
        platform="GCP",
        image_url=image_url,
        package_entrypoint="crashed",
        inference_port=8000,
        k8s_namespace="kubeflow")

    # Other code segments

    app.start()

Kubeflow training pipeline

A Kubeflow Pipeline is a collection of “Operations” which are executed within a Container within Kubernetes, as aContainerOp. Operations are designed to be re-usable and are thus are loosely coupled with Pipelines. Thanks to some wizardry, and the @hml.op function decorator, you can turn a simple function call into a ContainerOp.

Defining a ContainerOp

The following code outlines a basic ContainerOp which builds a data set to be used as the test set for the training Pipeline by executing some SQL against the Data Warehouse.

@hml.op()
@hml.pass_context
def create_test(ctx):
    services: GooglePlatformServices = ctx.obj["services"]
    model_container = get_model_container(ctx)

    column_string = ",".join(model_container.features_all)

    query = f"""
        SELECT {column_string}, {model_container.target}
        FROM crashed.crashes_raw 
        WHERE accident_date > '2018-01-01'
    """
    services.warehouse.select_into(
        query, services.config.warehouse_dataset, BQ_TABLE_TEST
    )

    logging.info(f"Wrote test set to {BQ_TABLE_TEST}.  Success!")

Defining a Pipeline

You will recall that a Pipeline is just a collection of operations executed as a graph (and ordered according to dependencies). We can define our Pipeline using the following code:

    @hml.pipeline(app.pipelines, cron="0 0 * * *", experiment="demos")
    def crashed_pipeline():
        """
        This is where we define the workflow for this pipeline purely
        with method invocations.
        """
        create_training_op = pipeline.create_training()
        create_test_op = pipeline.create_test()
        train_model_op = pipeline.train_model()

        # Set up the dependencies for this model
        (
            train_model_op
            .after(create_training_op)
            .after(create_test_op)
        )

There are a couple of interesting things to note here:

  • In the decorator, we can define a cron expression to schedule the execution of this pipeline
  • In the decorator, we can also define which “experiment” jobs associated with this pipeline should be collected in
  • The actual pipeline steps are just method invocations
  • The order that pipelines are executed in is defined via calls to .after() or .before()

Pipeline deployment – configuration

We expect our Pipeline to run in Kubernetes, via Kubeflow which means we need to do some additional work to ensure that the Container responsible for execution of each Operation is configured properly. This means having the ability to set Environment Variables, Mount volumes, Load Secrets, etc.

The good news here is that this can all be done in Python, using yet another decorator.

    @hml.deploy_op(app.pipelines)
    def op_configurator(op: hml.HmlContainerOp):
        """
        Configure our Pipeline Operation Pods with the right secrets and 
        environment variables so that it can work with our cloud
        provider's services
        """

        (op
            # Service account for authentication / authorisation
            .with_gcp_auth("svcacc-tez-kf")
            .with_env("GCP_PROJECT", "grwdt-dev")
            .with_env("GCP_ZONE", "australia-southeast1-a")
            .with_env("K8S_NAMESPACE", "kubeflow")
            .with_env("K8S_CLUSTER", "kf-crashed")
            # Data Lake Config
            .with_env("LAKE_BUCKET", "grwdt-dev-lake")
            .with_env("LAKE_PATH", "crashed")
            # Data Warehouse Config
            .with_env("WAREHOUSE_DATASET", "crashed")
            .with_env("WAREHOUSE_LOCATION", "australia-southeast1")
            # Track where we are going to write our artifacts
            .with_empty_dir("artifacts", "/artifacts")

            # Pass through environment variables from my CI/CD Environment
            # into my container
            .with_env("GITLAB_TOKEN", os.environ["GITLAB_TOKEN"])
            .with_env("GITLAB_PROJECT", os.environ["GITLAB_PROJECT"])
            .with_env("GITLAB_URL", os.environ["GITLAB_URL"])
         )
        return op

Functions decorated with the @hml.deploy_op are executed just prior to being deployed to Kubernetes, and will therefore run within the context of out CI/CD platform (e.g. we can set the $GITLAB_TOKEN environment variable as a Gitlab CI/CD Secret and it will be passed into the execution environment for the ContainerOp – which is pretty neat!

Pipeline deployment – deploy

With everything configured in Python, the actual deployment step is a simple one-liner:

crashed pipelines crashed_pipeline deploy-prod --host $KFP_HOST --client-id $KFP_CLIENT_ID --namespace $K8S_NAMESPACE

Deployed Pipeline

Model Container

Earlier we talked about the need to package up the Inference and Training phases of the model so that we could re-use shared functionality. Central to this is a unified view of what the Model actually is. In this example, we define our model as a ModelContainer within the shared.py file:

def crashed_model_container(app:  hml.HmlApp):
    """
        This is where we define what our model container looks like which helps
        us to track features / targets in the one place
    """
    numeric_features: List[str] = [
        "inj_or_fatal",
        "fatality",
        "males",
        "females",
        "driver",
        "pedestrian",
        "old_driver",
        "young_driver",
        "unlicencsed",
        "heavyvehicle",
        "passengervehicle",
        "motorcycle",
    ]

    categorical_features: List[str] = [
        "accident_time",
        "accident_type",
        "day_of_week",
        "dca_code",
        "hit_run_flag",
        "light_condition",
        "road_geometry",
        "speed_zone",
    ]

    model_container = hml.ModelContainer(
        name=MODEL_NAME,
        project_name="demo-crashed",
        features_numeric=numeric_features,
        features_categorical=categorical_features,
        target="alcohol_related",
        services=app.services
    )
    return model_container

This un-remarkable code define the ModelContainer and tells us about the different features we are interested in (and whether they are categorical or numeric).

This ModelContainer object is however quite useful when we need to pre-process the data (in both Training and Inference phases), which we do with the following function:

def build_feature_matrix(model_container, data_frame: pd.DataFrame, throw_on_missing=False):
    """
        Given an input dataframe, encode the categorical features (one-hot)
        and use the numeric features without change.  If we see a value in our
        dataframe, and "throw_on_missing" == True, then we will throw an exception
        as the mapping back to the original matrix wont make sense.
    """
    logging.info(f"build_feature_matrix: {model_container.name}")

    # Now lets do the encoding thing...
    encoded_df = one_hot_encode(
        data_frame, model_container.feature_uniques, throw_on_missing=throw_on_missing
    )

    for nf in model_container.features_numeric:
        encoded_df[nf] = data_frame[nf]

    matrix = encoded_df.values
    return matrix

The ModelContainer object also provides functionality to Analyse features (e.g. calculate distributions, find unique values) which is useful for normalisation and encoding. It also provides functionality for saving the .joblib file containing the trained model.

The result of both Training and this Analyse phase can be serialized and deserialized either locally, or to Cloud Storage, so that we can save it at the end of the Training Pipeline, and load it as a part of the Inference App’s initialisation.

Defining an Inference App

The Inference App functionality is also defined use python decorators:

    @hml.inference(app.inference)
    def crashed_inference(inference_app: hml.HmlInferenceApp):
        # Get a reference to the current version of my model
        model_container = inference_app.get_model(shared.MODEL_NAME)
        model_container.load()

        # Define our routes here, which can then call other functions with more
        # context
        @inference_app.flask.route("/predict", methods=["GET"])
        def predict():
            logging.info("api: /predict")

            feature_params = request.args.to_dict()
            return inference.predict_alcohol(inference_app, model_container, feature_params)

The @hml.inference decorator wraps a method which is run when the container starts up in Kubernetes (or locally). This method is responsible for loading the serialized ModelContainer, and defining API routes via Flask. With that all set up, we can call into the method which actually makes the prediction:

def predict_alcohol(
    inference_app: hml.HmlInferenceApp, 
    model_container: hml.ModelContainer,
    params: Dict[str,str]):

    logging.info("predict_alcohol")
    # Lets not make the user specify every possible feature
    # so we will replace any missing ones with our defaults
    for k in shared.default_features:
        if k not in params:
            params[k] = shared.default_features[k]

    # Translate features to a dataframe
    features_df = pd.DataFrame([params])

    try:
        # Now we turn it into a matrix, ready for some XTREME boosting
        feature_matrix = shared.build_feature_matrix(
            model_container, features_df, throw_on_missing=True
        )
        # Ask the model to do the predictions
        predictions = [v for v in model_container.model.predict(feature_matrix)]
        return jsonify(
            {
                "success": True,
                "features": params,
                "prediction": f"{predictions[0]}",
            }
        )
    except Exception as ex:
        return jsonify({"success": False, "error": ex.args[0]})

Inference App Deployment

Just like with the ContainerOp deployment, we need an opportunity to configure our deployment before sending it to Kubernetes. This configuration phase is done with, you guessed it, a Python Decorator!

    @hml.deploy_inference(app.inference)
    def deploy_inference(deployment: hml.HmlInferenceDeployment):
        print(f"Preparing deploying: {deployment.deployment_name} ({deployment.k8s_container.image} -> {deployment.k8s_container.args} )")

        (
            deployment
            .with_gcp_auth("svcacc-tez-kf")
            .with_empty_dir("tmp", "/temp")
            .with_empty_dir("artifacts", "/artifacts")
        )
        pass

With this set up, we can deploy our application using the following command:

crashed inference deploy

CI/CD Setup

A CI/CD system is a critical piece of infrastructure for this pattern. In this example we are using GitLab, but you could just as easily use any other Container based CI/CD system.

Build Stage

As everything in Kubeflow / Kubernetes is container based, we need to build a container with all our dependencies, which we can do as a task in our .gitlab-ci.yml:

build-app-image:
  stage: build
  script:
    - docker login --username $DOCKERHUB_USERNAME --password $DOCKERHUB_PASSWORD docker.io
    - echo "Building..."
    - docker build -t $DOCKERHUB_IMAGE:$CI_COMMIT_SHA -f ./deploy/crashed.Dockerfile .
    - docker push $DOCKERHUB_IMAGE:$CI_COMMIT_SHA

Where the $DOCKERHUB_IMAGE is the url to where we want to push our image to.

The actual Dockerfile used to build the image is:

FROM growingdata/hypermodel:xgboost-1.3.74


RUN apk --no-cache add \
    libffi-dev openssl-dev python-dev py-pip build-base

# Hyper model requirements
RUN pip install hypermodel

ADD ./src /pkg_src/demo-car-crashes

# Install our package from source code
WORKDIR /pkg_src/demo-car-crashes
RUN pip install -e .

WORKDIR /crashed

Deploy Pipeline Stage

When our container is successfully built, we may also want to deploy the latest version of our Pipeline to Kubeflow. This can be done with the following snippet from .gitlab-ci.yml:

deploy-pipeline:
  stage: deploy
  only:
    - master # Only deploy on a merge to master
  dependencies:
    - build-app-image
  script:
    - gcloud config set project $GCP_PROJECT
    - gcloud auth activate-service-account --key-file $GOOGLE_APPLICATION_CREDENTIALS
    - gcloud container clusters get-credentials $KFAPP --zone $GCP_ZONE --project $GCP_PROJECT

    # Install this package
    - pip install -e src/

    # Deploy the pipeline using the newly deployed source
    - crashed pipelines crashed_pipeline deploy-prod --host $KFP_HOST --client-id $KFP_CLIENT_ID --namespace $K8S_NAMESPACE

Deploy a new version of the Model

The deployment of a new version of the model is as simple as re-deploying our Inference App with a newer version of the reference json file (which in this case will be saved in the repo to ./crashed-xgb-reference.json). We can manage this in GitLab CI by running the InferenceApp deployment only when this file changes:

deploy-model:
  stage: deploy
  dependencies:
    - build-app-image
  only:
    changes:
      - crashed-xgb-reference.json
    refs:
      - master
  script:
    - gcloud config set project $GCP_PROJECT
    - gcloud auth activate-service-account --key-file $GOOGLE_APPLICATION_CREDENTIALS
    - gcloud container clusters get-credentials $KFAPP --zone $GCP_ZONE --project $GCP_PROJECT

    # Install this package
    - pip install -e src/

    # Deploy it using our new package
    - crashed inference deploy

Source Code

Please have a look at the full project here:

https://github.com/GrowingData/hyper-model/tree/master/demo/car-crashes

Leave a comment

Name*

Website

Comment

  1. Nice… it’s a lot to take in but nobody said putting a full lifecycle on AI deployment was easy 🙂
    That docs link for HyperModel opens a page that requires authentication… you might want to double check that.
    Looking forward to the next articles in the series. TIA