Lightweight DAG Framework

2023-12-31

Introduction

python-dirk is a proof-of-concept library, demonstrating a new API for declaring Direct Acyclic Graph (DAG) workflow. Here is a snippet, of writing task A -> task B:

from python_dirk import Pipeline, Depends

async def handle_request:
    dag = Pipeline() # 1 

    @dag.mark_node(timeout_in_ms=100, failsafe=2) # 2
    async def coro_a():
        await asyncio.sleep(0.5)
        print("FUNCTION {} after sleeping for {} s".format("coro_a", 0.5))

    @dag.mark_node(timeout_in_ms=100, failsafe=2)
    async def coro_b(x=Depends(coro_a)): # 3
        await asyncio.sleep(0.5)
        print("FUNCTION {} after sleeping for {} s".format("coro_c", 0.5))

    result = await dag.execute_graph_v2() # 4
    return result

Notes:

  1. a pipeline == DAG, where node representing a unit of work, and directed edge representing the dependencies.
  2. each unit of work is constrained to run under timeout_in_ms; if it does not, pipeline will pass failsafe value to downstream works.
  3. dependencies are declared like FastAPI Depends. The caveat here is that the variable MUST BE part of pipeline FIRST.
  4. execute_graph_v2() kick-starts and runs to completion.

Why another workflow engine?

There are DOZENS of workflow engine out there. python-dirk is lightweight:

Because it is lightweight, you can run it inside a REST endpoint.

Why this engine over other engine?

You don't choose this over others. I have not run this in production, and neither should you.

What is good about this?

You use this if you like the FastAPI style of dependency injection.

This builds on that syntax, and add timeout and failsafe to make it suitable for high availability requirement.