This document builds on our getting started guide and takes you through the process to build, deploy and run a pipeline via the SDK.
Install the Kubeflow SDK
pip install kfp
Create the Pipeline
If you already have a pipeline you would like to deploy, you can use it. Alternatively, you can download one of the samples using the command below:
wget https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/core/condition/condition.py
Compile the Pipeline
The Kubeflow SDK includes the dsl-compile
command which will turn your Python file into a .zip
file which can be uploaded to Kubeflow. You can do this using the following command:
dsl-compile --py condition.py --output condition.tar.gz
Alternatively, the pipeline file has the following section in it:
if __name__ == '__main__': kfp.compiler.Compiler().compile(flipcoin_pipeline, __file__ + '.zip')
Which will automatically compile the Pipeline simply by executing:
python condition.py
Create a Python Uploader
import kfp import sys import getopt def upload(path, name): client = kfp.Client() client.upload_pipeline(path, name) def main(): opts, args = getopt.getopt(sys.argv[1:], "p:n", ["pipeline=", "name="]) pipeline_path = None pipeline_name = None for opt, arg in opts: if opt in ("-p", "--pipeline"): pipeline_path = arg elif opt in ("-n", "--name"): pipeline_name = arg upload(pipeline_path, pipeline_name) if __name__ == '__main__': main()
This can then be used by calling:
python upload.py --pipeline condition.tar.gz --name condition-cli-upload
Automatically deploying / running Pipelines
Requires separate steps for Compiling and Uploading, and then you have to use the Kubeflow Pipelines UI to navigate to the pipeline, create a run and launch it which is a bit cumbersome (especially since you cannot have two pipelines with the same name).
An alternative to this is to replace the:
if __name__ == '__main__': kfp.compiler.Compiler().compile(flipcoin_pipeline, __file__ + '.zip')
Section within your pipeline with:
from kfp import Client from datetime import datetime ... def test_deployed(pipeline): deploy_args = dict() pipeline_name = pipeline.__name__ experiment_name = f"{pipeline_name}_tests" run_name = pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S') print(f"hm> pipeline: {pipeline_name}") print(f"hm> experiment: {experiment_name}") print(f"hm> run: {run_name}") client = Client(None, None) client.create_run_from_pipeline_func(pipeline, deploy_args, run_name=run_name, experiment_name=experiment_name) print(f"hm> Deployed and running!") if __name__ == '__main__': test_deployed(flipcoin_pipeline)
Now when you run your pipeline in the command line it will automatically create a new run for it in an experiment with a _tests
suffix.
Thanks again for the article post.Really thank you! Awesome.