2023-12-31
python-dirk is a proof-of-concept library, demonstrating a new API for declaring Direct Acyclic Graph (DAG) workflow. Here is a snippet, of writing task A -> task B:
from python_dirk import Pipeline, Depends
async def handle_request:
dag = Pipeline() # 1
@dag.mark_node(timeout_in_ms=100, failsafe=2) # 2
async def coro_a():
await asyncio.sleep(0.5)
print("FUNCTION {} after sleeping for {} s".format("coro_a", 0.5))
@dag.mark_node(timeout_in_ms=100, failsafe=2)
async def coro_b(x=Depends(coro_a)): # 3
await asyncio.sleep(0.5)
print("FUNCTION {} after sleeping for {} s".format("coro_c", 0.5))
result = await dag.execute_graph_v2() # 4
return result
Notes:
timeout_in_ms
; if it does not, pipeline will pass failsafe
value to downstream works.Depends
. The caveat here is that the variable MUST BE part of pipeline FIRST.execute_graph_v2()
kick-starts and runs to completion.There are DOZENS of workflow engine out there. python-dirk
is lightweight:
loguru
and networkx
. Plan to remove them in the future.Because it is lightweight, you can run it inside a REST endpoint.
You don't choose this over others. I have not run this in production, and neither should you.
You use this if you like the FastAPI style of dependency injection.
This builds on that syntax, and add timeout and failsafe to make it suitable for high availability requirement.