diff --git a/docs/next/src/pages/overview/execution/index.mdx b/docs/next/src/pages/overview/execution/index.mdx new file mode 100644 --- /dev/null +++ b/docs/next/src/pages/overview/execution/index.mdx @@ -0,0 +1,57 @@ +import { DynamicMetaTags } from 'components/MetaTags'; +import PyObject from 'components/PyObject'; + + + +# Executing Pipelines + +We provide several different ways to do one-off execution of pipelines and solids. For recurring execution, consult the /overview/scheduling-partitions/schedules section. + +## Dagit + +The Dagit [playground](/overview/dagit#playground) offers a way interactively build up the configuration for a run of a pipeline. Drops downs are available for selecting any defined or [partitions](/overview/scheduling-partitions/partitions) + +## CLI + +The `dagster` [cli](/_apidocs/cli) includes both `dagster pipeline execute` for direct execution as well as `dagster pipeline launch` for async launching. + +``` +dagster pipeline execute -f my_pipeline.py +``` + +## Python APIs + +We also provide python APIs for execution that are useful when writing tests or scripts. for full full pipeline execution and + +```python literalinclude caption=test_pipeline.py +file:/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py +startAfter:start_exec_pipeline +endBefore:end_exec_pipeline +``` + +We also provide for executing an individual solid. + +```python literalinclude caption=test_solid.py +file:/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py +startAfter:start_exec_solid +endBefore:end_exec_solid +``` + +By default the python APIs will use an ephemeral to avoid reporting test runs to the instance. When using the python API for production runs, set the instance using `instance=DagsterInstance.get()` to use the default loading behavior for the instance. + +```python literalinclude caption=execute.py +file:/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py +startAfter:start_script +endBefore:end_script +``` + +If pipeline execution will involve reconstucting the pipeline in another process, such as when using the multi-process executor or dagstermill, the python APIs will need a instance of the pipeline. This work is handled for you when using dagit or the cli. You will also need to configure the intermediate values between solids to be stored in a way that they can be accessed across processes. + +```python literalinclude caption=execute.py +file:/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py +startAfter:start_multi_proc +endBefore:end_multi_proc +``` diff --git a/docs/next/src/treeOfContents.json b/docs/next/src/treeOfContents.json --- a/docs/next/src/treeOfContents.json +++ b/docs/next/src/treeOfContents.json @@ -95,6 +95,10 @@ } ] }, + { + "name": "Execution", + "path": "/overview/execution/" + }, { "name": "Configuration", "path": "/overview/configuration/" @@ -499,4 +503,4 @@ } ] } -] \ No newline at end of file +] diff --git a/examples/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py b/examples/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py new file mode 100644 --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/overview_tests/execution_tests/test_execute.py @@ -0,0 +1,72 @@ +from dagster import ( + DagsterInstance, + execute_pipeline, + execute_solid, + pipeline, + reconstructable, + solid, +) +from dagster.core.test_utils import instance_for_test + + +@solid +def always_blue(_): + return "blue" + + +@pipeline +def predict_color(): + always_blue() + + +# start_exec_pipeline +def test_execute_pipeline(): + result = execute_pipeline(predict_color) + assert result.success + assert result.output_for_solid("always_blue") == "blue" + + +# end_exec_pipeline + +# start_exec_solid +def test_execute_solid(): + result = execute_solid(always_blue) + assert result.success + assert result.output_value() == "blue" + + +# end_exec_solid + + +def script_example(): + with instance_for_test(): + # start_script + execute_pipeline(predict_color, instance=DagsterInstance.get()) + # end_script + + +# start_multi_proc +def test_multiprocess_executor(): + result = execute_pipeline( + run_config={ + # This section controls how the run will be executed. + # The multiprocess executor runs each step in its own sub process. + "execution": {"multiprocess": {}}, + # This section controls how values will be passed from one solid to the next. + # The default is in memory, so here we set it to filesystem to allow the + # separate subprocess to get the values + "intermediate_storage": {"filesystem": {}}, + }, + # The default instance for this API is an in memory ephemeral one. + # To allow the multiple processes to coordinate we use one here + # backed by a temporary directory. + instance=DagsterInstance.local_temp(), + # A ReconstructablePipeline is necessary to load the pipeline in child processes. + # reconstructable() is a utility function that captures where the + # PipelineDefinition came from. + pipeline=reconstructable(predict_color), + ) + assert result.success + + +# end_multi_proc