Skip links

Kubeflow Pipelines – Running and Deploying Pipelines via the SDK

This document builds on our getting started guide and takes you through the process to build, deploy and run a pipeline via the SDK.

Install the Kubeflow SDK

pip install kfp

Create the Pipeline

If you already have a pipeline you would like to deploy, you can use it. Alternatively, you can download one of the samples using the command below:

wget https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/core/condition/condition.py

Compile the Pipeline

The Kubeflow SDK includes the dsl-compile command which will turn your Python file into a .zip file which can be uploaded to Kubeflow. You can do this using the following command:

dsl-compile --py condition.py --output condition.tar.gz

Alternatively, the pipeline file has the following section in it:

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(flipcoin_pipeline, __file__ + '.zip')

Which will automatically compile the Pipeline simply by executing:

python condition.py

Create a Python Uploader

import kfp
import sys
import getopt

def upload(path, name):
    client = kfp.Client()
    client.upload_pipeline(path, name)


def main():
    opts, args = getopt.getopt(sys.argv[1:], "p:n", ["pipeline=", "name="])
    pipeline_path = None
    pipeline_name = None
    for opt, arg in opts:
        if opt in ("-p", "--pipeline"):
            pipeline_path = arg
        elif opt in ("-n", "--name"):
            pipeline_name = arg

    upload(pipeline_path, pipeline_name)


if __name__ == '__main__':
    main()

This can then be used by calling:

python upload.py --pipeline condition.tar.gz --name condition-cli-upload

Automatically deploying / running Pipelines

Requires separate steps for Compiling and Uploading, and then you have to use the Kubeflow Pipelines UI to navigate to the pipeline, create a run and launch it which is a bit cumbersome (especially since you cannot have two pipelines with the same name).

An alternative to this is to replace the:

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(flipcoin_pipeline, __file__ + '.zip')

Section within your pipeline with:

from kfp import Client
from datetime import datetime

...

def test_deployed(pipeline):

    deploy_args = dict()
    pipeline_name = pipeline.__name__

    experiment_name = f"{pipeline_name}_tests"
    run_name = pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S')

    print(f"hm> pipeline: {pipeline_name}")
    print(f"hm> experiment: {experiment_name}")
    print(f"hm> run: {run_name}")
    client = Client(None, None)
    client.create_run_from_pipeline_func(pipeline, deploy_args, run_name=run_name, experiment_name=experiment_name)

    print(f"hm> Deployed and running!")

if __name__ == '__main__':
    test_deployed(flipcoin_pipeline)

Now when you run your pipeline in the command line it will automatically create a new run for it in an experiment with a _tests suffix.

 

Leave a comment

Name*

Website

Comment