Results from a task can be provided to other tasks (or subflows) as upstream dependencies. Prefect uses upstream dependencies in two ways:
To populate dependency arrows in the flow run graph
To determine execution order for concurrently submitted units of work that depend on each other
Tasks vs. other functions
Only results from tasks inform Prefect's ability to determine dependencies. Return values from functions without task decorators, including subflows, do not carry the same information about their origin as task results.
When using non-sequential task runners such as the ConcurrentTaskRunner or DaskTaskRunner, the order of execution for submitted tasks is not guaranteed unless their dependencies are specified.
For example, compare how tasks submitted to the ConcurrentTaskRunner behave with and without upstream dependencies by clicking on the tabs below.
@flow(log_prints=True)# Default task runner is ConcurrentTaskRunnerdefflow_of_tasks():# no dependencies, execution is order not guaranteedfirst.submit()second.submit()third.submit()@taskdeffirst():print("I'm first!")@taskdefsecond():print("I'm second!")@taskdefthird():print("I'm third!")
Flow run 'pumpkin-puffin' - Created task run 'first-0' for task 'first'
Flow run 'pumpkin-puffin' - Submitted task run 'first-0' for execution.
Flow run 'pumpkin-puffin' - Created task run 'second-0' for task 'second'
Flow run 'pumpkin-puffin' - Submitted task run 'second-0' for execution.
Flow run 'pumpkin-puffin' - Created task run 'third-0' for task 'third'
Flow run 'pumpkin-puffin' - Submitted task run 'third-0' for execution.
Task run 'third-0' - I'm third!
Task run 'first-0' - I'm first!
Task run 'second-0' - I'm second!
Task run 'second-0' - Finished in state Completed()
Task run 'third-0' - Finished in state Completed()
Task run 'first-0' - Finished in state Completed()
Flow run 'pumpkin-puffin' - Finished in state Completed('All states completed.')
@flow(log_prints=True)# Default task runner is ConcurrentTaskRunnerdefflow_of_tasks():# with dependencies, tasks execute in orderfirst_result=first.submit()second_result=second.submit(first_result)third.submit(second_result)@taskdeffirst():print("I'm first!")@taskdefsecond(input):print("I'm second!")@taskdefthird(input):print("I'm third!")
Flow run 'statuesque-waxbill' - Created task run 'first-0' for task 'first'
Flow run 'statuesque-waxbill' - Submitted task run 'first-0' for execution.
Flow run 'statuesque-waxbill' - Created task run 'second-0' for task 'second'
Flow run 'statuesque-waxbill' - Submitted task run 'second-0' for execution.
Flow run 'statuesque-waxbill' - Created task run 'third-0' for task 'third'
Flow run 'statuesque-waxbill' - Submitted task run 'third-0' for execution.
Task run 'first-0' - I'm first!
Task run 'first-0' - Finished in state Completed()
Task run 'second-0' - I'm second!
Task run 'second-0' - Finished in state Completed()
Task run 'third-0' - I'm third!
Task run 'third-0' - Finished in state Completed()
Flow run 'statuesque-waxbill' - Finished in state Completed('All states completed.')
When a result from a task is used as input for another task, Prefect automatically recognizes the task that result originated from as an upstream dependency.
This applies to every way you can run tasks with Prefect, whether you're calling the task function directly, calling .submit(), or calling .map(). Subflows similarly recognize tasks results as upstream dependencies.
fromprefectimportflow,task@flow(log_prints=True)defflow_of_tasks():upstream_result=upstream.submit()downstream_1_result=downstream_1.submit(upstream_result)downstream_2_result=downstream_2.submit(upstream_result)mapped_task_results=mapped_task.map([downstream_1_result,downstream_2_result])final_task(mapped_task_results)@taskdefupstream():return"Hello from upstream!"@taskdefdownstream_1(input):returninput@taskdefdownstream_2(input):returninput@taskdefmapped_task(input):returninput@taskdeffinal_task(input):print(input)
Flow run graph displaying inferred dependencies with the "Dependency grid" layout selected
Tasks that do not share data can be informed of their upstream dependencies through the wait_for parameter. Just as with automatic dependencies, this applies to direct task function calls, .submit(), .map(), and subflows.
Differences with .map()
Manually defined upstream dependencies apply to all tasks submitted by .map(), so each mapped task must wait for all upstream dependencies passed into wait_for to finish. This is distinct from automatic dependencies for mapped tasks, where each mapped task must only wait for the upstream tasks whose results it depends on.
For more complex workflows, parts of your logic may require additional resources, different infrastructure, or independent parallel execution. A typical approach for addressing these needs is to execute that logic as separate deployment runs from within a flow.
Composing deployment runs into a flow so that they can be treated as upstream dependencies is as simple as calling run_deployment from within a task.
Given a deployment process-user of flow parallel-work, a flow of deployments might look like this:
By default, deployments started from run_deployment will also appear as subflows for tracking purposes. This behavior can be disabled by setting the as_subflow parameter for run_deployment to False.
Flow run graph displaying deployments as dependencies with the "Dependency grid" layout selected