Unified Interface for Constructing and Managing Workflows on different workflow engines, such as Argo Workflows, Tekton Pipelines, and Apache Airflow.


What is Couler?

Couler aims to provide a unified interface for constructing and managing workflows on different workflow engines, such as Argo Workflows, Tekton Pipelines, and Apache Airflow.

Couler is included in CNCF Cloud Native Landscape and LF AI Landscape.

Who uses Couler?

You can find a list of organizations who are using Couler in ADOPTERS.md. If you'd like to add your organization to the list, please send us a pull request.

Why use Couler?

Many workflow engines exist nowadays, e.g. Argo Workflows, Tekton Pipelines, and Apache Airflow. However, their programming experience varies and they have different level of abstractions that are often obscure and complex. The code snippets below are some examples for constructing workflows using Apache Airflow and Kubeflow Pipelines.

Apache Airflow Kubeflow Pipelines

def create_dag(dag_id,
    def hello_world_py(*args):
        print('Hello World')

    dag = DAG(dag_id,
    with dag:
        t1 = PythonOperator(
    return dag

for n in range(1, 10):
    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
    globals()[dag_id] = create_dag(

class FlipCoinOp(dsl.ContainerOp):
    """Flip a coin and output heads or tails randomly."""
    def __init__(self):
        super(FlipCoinOp, self).__init__(
            command=['sh', '-c'],
            arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
                       'else \'tails\'; print(result)" | tee /tmp/output'],
            file_outputs={'output': '/tmp/output'})

class PrintOp(dsl.ContainerOp):
    """Print a message."""
    def __init__(self, msg):
        super(PrintOp, self).__init__(
            command=['echo', msg],

# define the recursive operation
def flip_component(flip_result):
    print_flip = PrintOp(flip_result)
    flipA = FlipCoinOp().after(print_flip)
    with dsl.Condition(flipA.output == 'heads'):

    name='pipeline flip coin',
    description='shows how to use graph_component.'
def recursive():
    flipA = FlipCoinOp()
    flipB = FlipCoinOp()
    flip_loop = flip_component(flipA.output)
    PrintOp('cool, it is over. %s' % flipA.output).after(flip_loop)

Couler provides a unified interface for constructing and managing workflows that provides the following:

  • Simplicity: Unified interface and imperative programming style for defining workflows with automatic construction of directed acyclic graph (DAG).
  • Extensibility: Extensible to support various workflow engines.
  • Reusability: Reusable steps for tasks such as distributed training of machine learning models.
  • Efficiency: Automatic workflow and resource optimizations under the hood.

Please see the following sections for installation guide and examples.


  • Couler currently only supports Argo Workflows. Please see instructions here to install Argo Workflows on your Kubernetes cluster.
  • Install Python 3.6+
  • Install Couler Python SDK via the following pip command:
pip install git+https://github.com/couler-proj/couler

Alternatively, you can clone this repository and then run the following to install:

python setup.py install


Coin Flip

This example combines the use of a Python function result, along with conditionals, to take a dynamic path in the workflow. In this example, depending on the result of the first step defined in flip_coin(), the template will either run the heads() step or the tails() step.

Steps can be defined via either couler.run_script() for Python functions or couler.run_container() for containers. In addition, the conditional logic to decide whether to flip the coin in this example is defined via the combined use of couler.when() and couler.equal().

import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter

def random_code():
    import random

    res = "heads" if random.randint(0, 1) == 0 else "tails"

def flip_coin():
    return couler.run_script(image="python:alpine3.6", source=random_code)

def heads():
    return couler.run_container(
        image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']

def tails():
    return couler.run_container(
        image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']

result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())

submitter = ArgoSubmitter()


This example demonstrates different ways to define the workflow as a directed-acyclic graph (DAG) by specifying the dependencies of each task via couler.set_dependencies() and couler.dag(). Please see the code comments for the specific shape of DAG that we've defined in linear() and diamond().

import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter

def job_a(message):

def job_b(message):

def job_c(message):

def job_d(message):

#     A
#    / \
#   B   C
#  /
# D
def linear():
    couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)
    couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])
    couler.set_dependencies(lambda: job_c(message="C"), dependencies=["A"])
    couler.set_dependencies(lambda: job_d(message="D"), dependencies=["B"])

#   A
#  / \
# B   C
#  \ /
#   D
def diamond():
            [lambda: job_a(message="A")],
            [lambda: job_a(message="A"), lambda: job_b(message="B")],  # A -> B
            [lambda: job_a(message="A"), lambda: job_c(message="C")],  # A -> C
            [lambda: job_b(message="B"), lambda: job_d(message="D")],  # B -> D
            [lambda: job_c(message="C"), lambda: job_d(message="D")],  # C -> D

submitter = ArgoSubmitter()

Note that the current version only works with Argo Workflows but we are actively working on the design of the unified interface that is extensible to additional workflow engines. Please stay tuned for more updates and we welcome any feedback and contributions from the community.

Community Blogs and Presentations

  • v0.1.1rc8-stable(Apr 12, 2021)

    This release includes the compatibility fixes with different protobuf versions as well as fix for unnecessarily raised exception when using /tmp as mount path. This release also introduces the support for workflow memoization caches.

    List of changes since the last release can be found here.

    Source code(tar.gz)
    Source code(zip)
  • v0.1.1rc8(Mar 23, 2021)

  • v0.1.1rc7(Dec 2, 2020)

  • v0.1.1rc6(Sep 25, 2020)

    This release includes several bug fixes and enhancements. Below are some of the notable changes:

    • Bump the dependency of Argo Python client to v3.5.1 and re-enable Argo Workflow spec validation.
    • Fix incorrect ApiException import path for Kubernetes Python client with version 11.0.0 and above.
    • Support callable for Couler core APIs in stead of previously only types.FunctionType.
    • Switch to use Argo Workflows v2.10.2 for integration tests.
    Source code(tar.gz)
    Source code(zip)
  • v0.1.1rc5(Sep 15, 2020)

