diff --git a/docs/next/public/assets/images/tutorial/config_figure_one.png b/docs/next/public/assets/images/tutorial/config_figure_one.png index 87934944c..bdcdb48dc 100644 Binary files a/docs/next/public/assets/images/tutorial/config_figure_one.png and b/docs/next/public/assets/images/tutorial/config_figure_one.png differ diff --git a/docs/next/public/assets/images/tutorial/config_figure_two.png b/docs/next/public/assets/images/tutorial/config_figure_two.png deleted file mode 100644 index 142bb4c7a..000000000 Binary files a/docs/next/public/assets/images/tutorial/config_figure_two.png and /dev/null differ diff --git a/docs/next/src/pages/tutorial/basics_solids.mdx b/docs/next/src/pages/tutorial/basics_solids.mdx index 49ca4e692..257a9bb55 100644 --- a/docs/next/src/pages/tutorial/basics_solids.mdx +++ b/docs/next/src/pages/tutorial/basics_solids.mdx @@ -1,312 +1,311 @@ import { DynamicMetaTags } from 'components/MetaTags'; import { CodeReferenceLink } from 'components/CodeReference'; import AnchorHeading from 'components/AnchorHeading'; import PyObject from 'components/PyObject'; # Basics of Solids ## Parametrizing Solids with Inputs So far, we've only seen solids whose behavior is the same every time they're run: ```python literalinclude showLines startLine=7 emphasize-lines=4 caption=hello_cereal.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e01_first_pipeline/hello_cereal.py lines:7-20 ``` In general, though, rather than relying on hardcoded values like `dataset_path`, we'd like to be able to parametrize our solid logic. Appropriately parameterized solids are more testable and reusable. Consider the following more generic solid: ```python literalinclude showLines startLine=7 caption=inputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py lines:8-15 ``` Here, rather than hard-coding the value of `dataset_path`, we use an input, `csv_path`. It's easy to see why this is better. We can reuse the same solid in all the different places we might need to read in a CSV from a filepath. We can test the solid by pointing it at some known test CSV file. And we can use the output of another upstream solid to determine which file to load. Let's rebuild a pipeline we've seen before, but this time using our newly parameterized solid. ```python literalinclude showLines startLine=2 emphasize-lines=38 caption=inputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py startAfter:start_inputs_marker_0 endBefore:end_inputs_marker_0 ```
## Specifying Config for Pipeline Execution As you can see above, what's missing from this setup is a way to specify the `csv_path` input to our new `read_csv` solid in the absence of any upstream solids whose outputs we can rely on. Dagster provides the ability to stub inputs to solids that aren't satisfied by the pipeline topology as part of its flexible configuration facility. We can specify config for a pipeline execution regardless of which modality we use to execute the pipeline — the Python API, the Dagit GUI, or the command line:
Specifying config in the Python API
We previously encountered the function. Pipeline run config is specified by the second argument to this function, which must be a dict. This dict contains all of the user-provided configuration with which to execute a pipeline. As such, it can have [a lot of sections](/_apidocs/execution), but we'll only use one of them here: per-solid configuration, which is specified under the key `solids`: ```python literalinclude showLines startLine=46 caption=inputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py startAfter:start_inputs_marker_1 endBefore:end_inputs_marker_1 dedent:4 ``` The `solids` dict is keyed by solid name, and each solid is configured by a dict that may itself have several sections. In this case we are only interested in the `inputs` section, so that we can specify the value of the input `csv_path`. Now you can pass this run config to : ```python literalinclude showLines startLine=53 caption=inputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py startAfter:start_inputs_marker_2 endBefore:end_inputs_marker_2 dedent:4 ```
Specifying config using YAML fragments and the Dagster CLI
When executing pipelines with the Dagster CLI, we'll need to provide the run config in a file. We use YAML for the file-based representation of configuration, but the values are the same as before: ```YAML literalinclude showLines caption=inputs_env.yaml file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs_env.yaml ``` We can pass config files in this format to the Dagster CLI tool with the `-c` flag. ```bash dagster pipeline execute -f inputs.py -c inputs_env.yaml ``` In practice, you might have different sections of your run config in different yaml files—if, for instance, some sections change more often (e.g. in test and prod) while other are more static. In this case, you can set multiple instances of the `-c` flag on CLI invocations, and the CLI tools will assemble the YAML fragments into a single run config.
Using the Dagit config editor
Dagit provides a powerful, schema-aware, typeahead-enabled config editor to enable rapid experimentation with and debugging of parameterized pipeline executions. As always, run: ```bash dagit -f inputs.py ``` Notice that no execution plan appears in the bottom pane of the Playground. ![inputs_figure_one.png](/assets/images/tutorial/inputs_figure_one.png) Because Dagit is schema-aware, it knows that this pipeline now requires configuration in order to run without errors. In this case, since the pipeline is relatively trivial, it wouldn't be especially costly to run the pipeline and watch it fail. But when pipelines are complex and slow, it's invaluable to get this kind of feedback up front rather than have an unexpected failure deep inside a pipeline. Recall that the execution plan, which you will ordinarily see above the log viewer in the **Execute** tab, is the concrete pipeline that Dagster will actually execute. Without a valid config, Dagster can't construct a parametrization of the logical pipeline—so no execution plan is available for us to preview. Press Ctrl + Space in order to bring up the typeahead assistant. ![inputs_figure_two.png](/assets/images/tutorial/inputs_figure_two.png) Here you can see all of the sections available in the run config. Don't worry, we'll get to them all later. Let's enter the config we need in order to execute our pipeline. ![inputs_figure_three.png](/assets/images/tutorial/inputs_figure_three.png) Note that as you type and edit the config, the config minimap hovering on the right side of the editor pane changes to provide context—you always know where in the nested config schema you are while making changes.

## Parametrizing Solids with Config Solids often depend in predictable ways on features of the external world or the pipeline in which -they're invoked. For example, consider an extended version of our CSV-reading solid that implements -more of the options available in the underlying Python API: - -```python literalinclude showLines startLine=8 emphasize-lines=9-15 caption=config_bad_1.py -file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_bad_1.py -startAfter:start_config_bad_1_marker_0 -endBefore:end_config_bad_1_marker_0 -``` - -We obviously don't want to have to write a separate solid for each permutation of these parameters -that we use in our pipelines—especially because, in more realistic cases like configuring a -Spark job or even parametrizing the `read_csv` function from a popular package like -[Pandas](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html#pandas.read_csv), -we might have dozens or hundreds of parameters like these. +they're invoked. We obviously don't want to have to write a separate solid for each permutation of these parameters +that we use in our pipelines— especially because, in more realistic cases like configuring a +Spark job, we might have dozens or hundreds of parameters to configure. But hoisting all of these parameters into the signature of the solid function as inputs isn't the -right answer either: - -```python literalinclude showLines startLine=8 emphasize-lines=5-11 caption=config_bad_2.py -file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_bad_2.py -startAfter:start_config_bad_2_marker_0 -endBefore:end_config_bad_2_marker_0 -``` +right answer. -Defaults are often sufficient for configuration values like these, and sets of parameters are often +Defaults are often sufficient for configuration values, and sets of parameters are often reusable. Moreover, it's unlikely that values like this will be provided dynamically by the outputs of other solids in a pipeline. Inputs, on the other hand, will usually be provided by the outputs of other solids in a pipeline, even though we might sometimes want to stub them using the config facility. For all these reasons, it's bad practice to mix configuration values like these with true input values. The solution is to define a config schema for our solid: -```python literalinclude showLines startLine=2 emphasize-lines=17,35-43,90 caption=config.py +```python literalinclude showLines startLine=16 emphasize-lines=1,6,9 caption=config.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py startAfter:start_config_marker_0 endBefore:end_config_marker_0 ``` -First, we pass the `config` argument to the decorator. This tells Dagster to give our solid a config field structured as -a dictionary, whose keys are the keys of this argument, and the types of whose values are defined by -the values of this argument (instances of ). +We want to control the way our solid sorts the cereals. +First, we pass the `config_schema` argument to the decorator. +This tells Dagster to give our solid a config field structured as a dictionary. +Then, we use the expected type of `reverse`, setting it to be a boolean. +Finally, inside the body of the solid function `sort_by_calories`, we access the config value set by the user using the +`solid_config` field on the familiar object. -Then, we define one of these fields, `escapechar`, to be a string, setting a default value, making -it optional, and setting a human-readable description. +To execute this `config_pipeline`, we can specify the pipeline run config in the Python API: -Finally, inside the body of the solid function, we access the config value set by the user using the -`solid_config` field on the familiar object. When Dagster executes our -pipeline, the framework will make validated config for each solid available on this object. +```python literalinclude showLines startLine=60, emphasize-lines=4 caption=config.py +file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py +startAfter: start_config_marker_1 +endBefore: end_config_marker_1 +dedent:4 +``` -Let's see how all of this looks in Dagit. As usual, run: +You can see that we've added a new section to the solid config. In addition to the `inputs` section for `read_csv`, +we now have a `config` section for `sort_by_calories`, where we can set values defined in the `config_schema` argument +to . + +We can also pass the run config in a YAML file to the Dagster CLI tool like below. Since we want to sort the cereals by calories in a reversed order +and find the most caloric cereal, we set the reverse value to `True` in the `config` section. ```bash -dagit -f config.py +dagster pipeline execute -f config.py -c config_env.yaml ``` -![config_figure_one.png](/assets/images/tutorial/config_figure_one.png) +```YAML literalinclude showLines caption=config_env.yaml +file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml +``` +In the console, we'll see the most caloric cereal is Mueslix Crispy Blend. + +```bash +dagster - INFO - system - 0039ddb3-4b13-4d0c-8c01-4eb6fb1b90c6 - sort_by_calories.compute - Most caloric cereal: Mueslix Crispy Blend +``` -As you may by now expect, Dagit provides a fully type-aware and schema-aware config editing -environment with a typeahead. The human-readable descriptions we provided on our config fields -appear in the config context minimap, as well as in typeahead tooltips and in the Explore pane when -clicking into the individual solid definition. +In this case, we have only defined one key value pair, `reverse` being the key. We can define more key value pairs like this. +What's more, we can define the value as an object to make the config optional: -![config_figure_two.png](/assets/images/tutorial/config_figure_two.png) +```python literalinclude showLines startLine=18, caption=config_more_details.py +file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py +startAfter: start_details_marker_0 +endBefore: end_details_marker_0 +``` + +Here, we can see the Dagster config type is equivalent to the ordinary `bool` type. +The `default_value` and `is_required` are set to be `False`, and a human-readable description for this config is provided. -You can see that we've added a new section to the solid config. In addition to the `inputs` section, -which we'll still use to set the `csv_path` input, we now have a `config` section, where we can set -values defined in the `config` argument to . +Let's see how the config section looks in Dagit. As usual, run: -```YAML literalinclude caption=config_env_bad.yaml -file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml +```bash +dagit -f config.py ``` -Of course, this config won't give us the results we're expecting. The values in `cereal.csv` are -comma-separated, not semicolon-separated, as they might be if this were a CSV from Europe, where -commas are frequently used in place of the decimal point. +![config_figure_one.png](/assets/images/tutorial/config_figure_one.png) -We'll see later how we can use Dagster's facilities for automatic data quality checks to guard -against semantic issues like this, which won't be caught by the type system. +Dagit provides a fully type-aware and schema-aware config editing environment with a typeahead. +The human-readable description of `reverse` we provided on our config fields +appears in the config section on the right when we click into the `sort_by_calories` solid definition.
## Multiple and Conditional Outputs Solids can have arbitrarily many outputs, and downstream solids can depend on any number of these. What's more, solids can have outputs that are optional and don't have to be yielded, which lets us write pipelines where some solids conditionally execute based on the presence of an upstream output. Suppose we're interested in splitting hot and cold cereals into separate datasets and processing them separately, based on config. ```python literalinclude showLines caption=multiple_outputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.py startAfter:start_split_cereals endBefore:end_split_cereals ``` Solids that yield multiple outputs must declare, and name, their outputs (passing `output_defs` to the decorator). Output names must be unique and each yielded by a solid's compute function must have a name that corresponds to one of these declared outputs. We'll define two downstream solids and hook them up to the multiple outputs from `split_cereals`. ```python literalinclude showLines caption=multiple_outputs.py file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.py startAfter:# start-sort-and-pipeline endBefore:# end-sort-and-pipeline ``` As usual, we can visualize this in Dagit: ![multiple_outputs.png](/assets/images/tutorial/multiple_outputs.png) Notice that the logical DAG corresponding to the pipeline definition includes both dependencies -- we won't know about the conditionality in the pipeline until runtime, when `split_cereal` might not yield both of the outputs. ![multiple_outputs_zoom.png](/assets/images/tutorial/multiple_outputs_zoom.png) Zooming in, Dagit shows us the details of the multiple outputs from `split_cereals` and their downstream dependencies, `hot_cereals` and `cold_cereals` . When we execute this pipeline with the following config, we'll see that the cold cereals output is omitted and that the execution step corresponding to the downstream solid is marked skipped in the execution pane: ```YAML literalinclude showLines caption=multiple_outputs.yaml file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.yaml ``` ![conditional_outputs.png](/assets/images/tutorial/conditional_outputs.png)

diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py index bba4a4dfe..be43bca87 100644 --- a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py +++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py @@ -1,117 +1,69 @@ -# start_config_marker_0 import csv import os -from dagster import ( - Bool, - Field, - Int, - String, - execute_pipeline, - pipeline, - solid, -) +from dagster import execute_pipeline, pipeline, solid -@solid( - config_schema={ - "delimiter": Field( - String, - default_value=",", - is_required=False, - description=("A one-character string used to separate fields."), - ), - "doublequote": Field( - Bool, - default_value=False, - is_required=False, - description=( - "Controls how instances of quotechar appearing inside a field " - "should themselves be quoted. When True, the character is " - "doubled. When False, the escapechar is used as a prefix to " - "the quotechar." - ), - ), - "escapechar": Field( - String, - default_value="\\", - is_required=False, - description=( - "On reading, the escapechar removes any special meaning from " - "the following character." - ), - ), - "quotechar": Field( - String, - default_value='"', - is_required=False, - description=( - "A one-character string used to quote fields containing " - "special characters, such as the delimiter or quotechar, " - "or which contain new-line characters." - ), - ), - "quoting": Field( - Int, - default_value=csv.QUOTE_MINIMAL, - is_required=False, - description=( - "Controls when quotes should be generated by the writer and " - "recognised by the reader. It can take on any of the " - "csv.QUOTE_* constants" - ), - ), - "skipinitialspace": Field( - Bool, - default_value=False, - is_required=False, - description=( - "When True, whitespace immediately following the delimiter " - "is ignored. The default is False." - ), - ), - "strict": Field( - Bool, - default_value=False, - is_required=False, - description=("When True, raise exception on bad CSV input."), - ), - } -) -def read_csv(context, csv_path: str): +@solid +def read_csv(context, csv_path): csv_path = os.path.join(os.path.dirname(__file__), csv_path) with open(csv_path, "r") as fd: - lines = [ - row - for row in csv.DictReader( - fd, - delimiter=context.solid_config["delimiter"], - doublequote=context.solid_config["doublequote"], - escapechar=context.solid_config["escapechar"], - quotechar=context.solid_config["quotechar"], - quoting=context.solid_config["quoting"], - skipinitialspace=context.solid_config["skipinitialspace"], - strict=context.solid_config["strict"], - ) - ] + lines = [row for row in csv.DictReader(fd)] context.log.info("Read {n_lines} lines".format(n_lines=len(lines))) - return lines -@pipeline -def config_pipeline(): - read_csv() +# start_config_marker_0 +@solid(config_schema={"reverse": bool}) +def sort_by_calories(context, cereals): + sorted_cereals = sorted( + cereals, + key=lambda cereal: int(cereal["calories"]), + reverse=context.solid_config["reverse"], + ) + + if context.solid_config["reverse"]: # find the most caloric cereal + context.log.info( + "{x} caloric cereal: {first_cereal_after_sort}".format( + x="Most", first_cereal_after_sort=sorted_cereals[0]["name"] + ) + ) + return { + "most_caloric": sorted_cereals[0], + "least_caloric": sorted_cereals[-1], + } + else: # find the least caloric cereal + context.log.info( + "{x} caloric cereal: {first_cereal_after_sort}".format( + x="Least", first_cereal_after_sort=sorted_cereals[0]["name"] + ) + ) + return { + "least_caloric": sorted_cereals[0], + "most_caloric": sorted_cereals[-1], + } # end_config_marker_0 + +@pipeline +def config_pipeline(): + sort_by_calories(read_csv()) + + if __name__ == "__main__": + # start_config_marker_1 run_config = { "solids": { - "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}} + "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}, + "sort_by_calories": {"config": {"reverse": True}}, } } + # end_config_marker_1 + + # start_config_marker_2 result = execute_pipeline(config_pipeline, run_config=run_config) + # end_config_marker_2 assert result.success diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml similarity index 69% rename from examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml rename to examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml index 1e6df5787..4cbe49818 100644 --- a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml +++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml @@ -1,7 +1,8 @@ solids: read_csv: - config: - delimiter: ";" inputs: csv_path: value: "cereal.csv" + sort_by_calories: + config: + reverse: True \ No newline at end of file diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py new file mode 100644 index 000000000..e3eea82fa --- /dev/null +++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py @@ -0,0 +1,69 @@ +import csv +import os + +from dagster import Bool, Field, execute_pipeline, pipeline, solid + + +@solid +def read_csv(context, csv_path): + csv_path = os.path.join(os.path.dirname(__file__), csv_path) + with open(csv_path, "r") as fd: + lines = [row for row in csv.DictReader(fd)] + + context.log.info("Read {n_lines} lines".format(n_lines=len(lines))) + return lines + + +# start_details_marker_0 +@solid( + config_schema={ + "reverse": Field( + Bool, + default_value=False, + is_required=False, + description="If `True`, cereals will be sorted in reverse order. Default: `False`", + ) + } +) +# end_details_marker_0 +def sort_by_calories(context, cereals): + sorted_cereals = sorted( + cereals, + key=lambda cereal: int(cereal["calories"]), + reverse=context.solid_config["reverse"], + ) + + if context.solid_config["reverse"]: # find the most caloric cereal + context.log.info( + "{x} caloric cereal: {first_cereal_after_sort}".format( + x="Most", first_cereal_after_sort=sorted_cereals[0]["name"] + ) + ) + else: # find the least caloric cereal + context.log.info( + "{x} caloric cereal: {first_cereal_after_sort}".format( + x="Least", first_cereal_after_sort=sorted_cereals[0]["name"] + ) + ) + + return { + "least_caloric": sorted_cereals[0], + "most_caloric": sorted_cereals[-1], + } + + +@pipeline +def config_pipeline(): + sort_by_calories(read_csv()) + + +if __name__ == "__main__": + run_config = { + "solids": { + "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}, + "sort_by_calories": {"config": {"reverse": True}}, + } + } + result = execute_pipeline(config_pipeline, run_config=run_config) + + assert result.success diff --git a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py new file mode 100644 index 000000000..cafa44ae0 --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py @@ -0,0 +1,66 @@ +from collections import OrderedDict + +from docs_snippets.intro_tutorial.basics.e02_solids.config import config_pipeline + +from dagster import execute_pipeline +from dagster.utils import pushd, script_relative_path + + +def test_tutorial_config_schema(): + with pushd(script_relative_path("../../../docs_snippets/intro_tutorial/basics/e02_solids/")): + result = execute_pipeline( + config_pipeline, + run_config={ + "solids": { + "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}, + "sort_by_calories": {"config": {"reverse": False}}, + } + }, + ) + + assert result.success + assert len(result.solid_result_list) == 2 + assert isinstance(result.result_for_solid("sort_by_calories").output_value(), dict) + assert result.result_for_solid("sort_by_calories").output_value() == { + "least_caloric": OrderedDict( + [ + ("name", "All-Bran with Extra Fiber"), + ("mfr", "K"), + ("type", "C"), + ("calories", "50"), + ("protein", "4"), + ("fat", "0"), + ("sodium", "140"), + ("fiber", "14"), + ("carbo", "8"), + ("sugars", "0"), + ("potass", "330"), + ("vitamins", "25"), + ("shelf", "3"), + ("weight", "1"), + ("cups", "0.5"), + ("rating", "93.704912"), + ] + ), + "most_caloric": OrderedDict( + [ + ("name", "Mueslix Crispy Blend"), + ("mfr", "K"), + ("type", "C"), + ("calories", "160"), + ("protein", "3"), + ("fat", "2"), + ("sodium", "150"), + ("fiber", "3"), + ("carbo", "17"), + ("sugars", "13"), + ("potass", "160"), + ("vitamins", "25"), + ("shelf", "3"), + ("weight", "1.5"), + ("cups", "0.67"), + ("rating", "30.313351"), + ] + ), + } + return result diff --git a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py index 1e710efcc..c95b68fec 100644 --- a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py +++ b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py @@ -1,363 +1,353 @@ import json import os import runpy import pytest from click.testing import CliRunner from dagit.app import create_app_from_workspace from dagster.cli.pipeline import pipeline_execute_command from dagster.cli.workspace import get_workspace_from_kwargs from dagster.core.instance import DagsterInstance from dagster.core.test_utils import instance_for_test from dagster.utils import ( DEFAULT_REPOSITORY_YAML_FILENAME, check_script, pushd, script_relative_path, ) PIPELINES_OR_ERROR_QUERY = """ { repositoriesOrError { ... on PythonError { message stack } ... on RepositoryConnection { nodes { pipelines { name } } } } } """ cli_args = [ # dirname, filename, fn_name, env_yaml, mode, preset, return_code, exception ( "basics/e01_first_pipeline/", "hello_cereal.py", "hello_cereal_pipeline", None, None, None, 0, None, ), ("basics/e02_solids/", "inputs.py", "inputs_pipeline", "inputs_env.yaml", None, None, 0, None), ( "basics/e02_solids/", "config_bad_1.py", "config_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "basics/e02_solids/", "config_bad_2.py", "config_pipeline", "config_bad_2.yaml", None, None, 0, None, ), - ("basics/e02_solids/", "config.py", "config_pipeline", "inputs_env.yaml", None, None, 0, None), - ( - "basics/e02_solids/", - "config.py", - "config_pipeline", - "config_env_bad.yaml", - None, - None, - 0, - None, - ), + ("basics/e02_solids/", "config.py", "config_pipeline", "config_env.yaml", None, None, 0, None,), ( "basics/e02_solids/", "multiple_outputs.py", "multiple_outputs_pipeline", "inputs_env.yaml", None, None, 0, None, ), ("basics/e03_pipelines/", "serial_pipeline.py", "serial_pipeline", None, None, None, 0, None), ("basics/e03_pipelines/", "complex_pipeline.py", "complex_pipeline", None, None, None, 0, None), ( "basics/e04_quality/", "inputs_typed.py", "inputs_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types.py", "custom_type_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types_2.py", "custom_type_pipeline", "custom_types_2.yaml", None, None, 1, Exception, ), ( "basics/e04_quality/", "custom_types_3.py", "custom_type_pipeline", "custom_type_input.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types_4.py", "custom_type_pipeline", "custom_type_input.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types_5.py", "custom_type_pipeline", "custom_type_input.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types_mypy_verbose.py", "custom_type_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "basics/e04_quality/", "custom_types_mypy_typing_trick.py", "custom_type_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "advanced/solids/", "reusable_solids.py", "reusable_solids_pipeline", "reusable_solids.yaml", None, None, 0, None, ), ( "advanced/solids/", "composite_solids.py", "composite_solids_pipeline", "composite_solids.yaml", None, None, 0, None, ), ( "advanced/pipelines/", "resources.py", "resources_pipeline", "resources.yaml", None, None, 0, None, ), ( "advanced/pipelines/", "required_resources.py", "resources_pipeline", "resources.yaml", None, None, 0, None, ), ( "advanced/pipelines/", "modes.py", "modes_pipeline", "resources.yaml", "unittest", None, 0, None, ), ("advanced/pipelines/", "presets.py", "presets_pipeline", None, None, "unittest", 0, None), ( "advanced/materializations/", "materializations.py", "materialization_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "advanced/materializations/", "output_materialization.py", "output_materialization_pipeline", "output_materialization.yaml", None, None, 0, None, ), ( "advanced/intermediates/", "serialization_strategy.py", "serialization_strategy_pipeline", "inputs_env.yaml", None, None, 0, None, ), ( "advanced/scheduling/", "scheduler.py", "hello_cereal_pipeline", "inputs_env.yaml", None, None, 0, None, ), ] def path_to_tutorial_file(path): return script_relative_path(os.path.join("../../docs_snippets/intro_tutorial/", path)) def load_dagit_for_workspace_cli_args(n_pipelines=1, **kwargs): instance = DagsterInstance.ephemeral() with get_workspace_from_kwargs(kwargs, instance) as workspace: app = create_app_from_workspace(workspace, instance) client = app.test_client() res = client.get( "/graphql?query={query_string}".format(query_string=PIPELINES_OR_ERROR_QUERY) ) json_res = json.loads(res.data.decode("utf-8")) assert "data" in json_res assert "repositoriesOrError" in json_res["data"] assert "nodes" in json_res["data"]["repositoriesOrError"] assert len(json_res["data"]["repositoriesOrError"]["nodes"][0]["pipelines"]) == n_pipelines return res def dagster_pipeline_execute(args, return_code): with instance_for_test(): runner = CliRunner() res = runner.invoke(pipeline_execute_command, args) assert res.exit_code == return_code, res.exception return res @pytest.mark.parametrize( "dirname,filename,fn_name,_env_yaml,_mode,_preset,_return_code,_exception", cli_args ) # dagit -f filename -n fn_name def test_load_pipeline( dirname, filename, fn_name, _env_yaml, _mode, _preset, _return_code, _exception ): with pushd(path_to_tutorial_file(dirname)): filepath = path_to_tutorial_file(os.path.join(dirname, filename)) load_dagit_for_workspace_cli_args(python_file=filepath, fn_name=fn_name) @pytest.mark.parametrize( "dirname,filename,fn_name,env_yaml,mode,preset,return_code,_exception", cli_args ) # dagster pipeline execute -f filename -n fn_name -e env_yaml --preset preset def test_dagster_pipeline_execute( dirname, filename, fn_name, env_yaml, mode, preset, return_code, _exception ): with pushd(path_to_tutorial_file(dirname)): filepath = path_to_tutorial_file(os.path.join(dirname, filename)) yamlpath = path_to_tutorial_file(os.path.join(dirname, env_yaml)) if env_yaml else None dagster_pipeline_execute( ["-f", filepath, "-a", fn_name] + (["-c", yamlpath] if yamlpath else []) + (["--mode", mode] if mode else []) + (["--preset", preset] if preset else []), return_code, ) @pytest.mark.parametrize( "dirname,filename,_fn_name,_env_yaml,_mode,_preset,return_code,_exception", cli_args ) def test_script(dirname, filename, _fn_name, _env_yaml, _mode, _preset, return_code, _exception): with pushd(path_to_tutorial_file(dirname)): filepath = path_to_tutorial_file(os.path.join(dirname, filename)) check_script(filepath, return_code) @pytest.mark.parametrize( "dirname,filename,_fn_name,_env_yaml,_mode,_preset,_return_code,exception", cli_args ) def test_runpy(dirname, filename, _fn_name, _env_yaml, _mode, _preset, _return_code, exception): with pushd(path_to_tutorial_file(dirname)): filepath = path_to_tutorial_file(os.path.join(dirname, filename)) if exception: with pytest.raises(exception): runpy.run_path(filepath, run_name="__main__") else: runpy.run_path(filepath, run_name="__main__") # TODO python command line # dagit def test_load_repo(): load_dagit_for_workspace_cli_args( n_pipelines=2, repository_yaml=path_to_tutorial_file( os.path.join("advanced/repositories/", DEFAULT_REPOSITORY_YAML_FILENAME) ), )