diff --git a/docs/next/public/assets/images/tutorial/config_figure_one.png b/docs/next/public/assets/images/tutorial/config_figure_one.png
index 87934944c..bdcdb48dc 100644
Binary files a/docs/next/public/assets/images/tutorial/config_figure_one.png and b/docs/next/public/assets/images/tutorial/config_figure_one.png differ
diff --git a/docs/next/public/assets/images/tutorial/config_figure_two.png b/docs/next/public/assets/images/tutorial/config_figure_two.png
deleted file mode 100644
index 142bb4c7a..000000000
Binary files a/docs/next/public/assets/images/tutorial/config_figure_two.png and /dev/null differ
diff --git a/docs/next/src/pages/tutorial/basics_solids.mdx b/docs/next/src/pages/tutorial/basics_solids.mdx
index 49ca4e692..257a9bb55 100644
--- a/docs/next/src/pages/tutorial/basics_solids.mdx
+++ b/docs/next/src/pages/tutorial/basics_solids.mdx
@@ -1,312 +1,311 @@
import { DynamicMetaTags } from 'components/MetaTags';
import { CodeReferenceLink } from 'components/CodeReference';
import AnchorHeading from 'components/AnchorHeading';
import PyObject from 'components/PyObject';
# Basics of Solids
## Parametrizing Solids with Inputs
So far, we've only seen solids whose behavior is the same every time they're run:
```python literalinclude showLines startLine=7 emphasize-lines=4 caption=hello_cereal.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e01_first_pipeline/hello_cereal.py
lines:7-20
```
In general, though, rather than relying on hardcoded values like `dataset_path`, we'd like to be
able to parametrize our solid logic. Appropriately parameterized solids are more testable and
reusable. Consider the following more generic solid:
```python literalinclude showLines startLine=7 caption=inputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py
lines:8-15
```
Here, rather than hard-coding the value of `dataset_path`, we use an input, `csv_path`. It's easy to
see why this is better. We can reuse the same solid in all the different places we might need to
read in a CSV from a filepath. We can test the solid by pointing it at some known test CSV file. And
we can use the output of another upstream solid to determine which file to load.
Let's rebuild a pipeline we've seen before, but this time using our newly parameterized solid.
```python literalinclude showLines startLine=2 emphasize-lines=38 caption=inputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py
startAfter:start_inputs_marker_0
endBefore:end_inputs_marker_0
```
## Specifying Config for Pipeline Execution
As you can see above, what's missing from this setup is a way to specify the `csv_path` input to our
new `read_csv` solid in the absence of any upstream solids whose outputs we can rely on. Dagster
provides the ability to stub inputs to solids that aren't satisfied by the pipeline topology as part
of its flexible configuration facility.
We can specify config for a pipeline execution regardless of which modality we use to execute the
pipeline — the Python API, the Dagit GUI, or the command line:
Specifying config in the Python API
We previously encountered the function. Pipeline run config is specified by the second
argument to this function, which must be a dict.
This dict contains all of the user-provided configuration with which to execute a pipeline. As such,
it can have [a lot of sections](/_apidocs/execution), but we'll only use one of
them here: per-solid configuration, which is specified under the key `solids`:
```python literalinclude showLines startLine=46 caption=inputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py
startAfter:start_inputs_marker_1
endBefore:end_inputs_marker_1
dedent:4
```
The `solids` dict is keyed by solid name, and each solid is configured by a dict that may itself
have several sections. In this case we are only interested in the `inputs` section, so that we can
specify the value of the input `csv_path`.
Now you can pass this run config to :
```python literalinclude showLines startLine=53 caption=inputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs.py
startAfter:start_inputs_marker_2
endBefore:end_inputs_marker_2
dedent:4
```
Specifying config using YAML fragments and the Dagster CLI
When executing pipelines with the Dagster CLI, we'll need to provide the run config in a file. We
use YAML for the file-based representation of configuration, but the values are the same as before:
```YAML literalinclude showLines caption=inputs_env.yaml
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/inputs_env.yaml
```
We can pass config files in this format to the Dagster CLI tool with the `-c` flag.
```bash
dagster pipeline execute -f inputs.py -c inputs_env.yaml
```
In practice, you might have different sections of your run config in different yaml files—if,
for instance, some sections change more often (e.g. in test and prod) while other are more static.
In this case, you can set multiple instances of the `-c` flag on CLI invocations, and the CLI tools
will assemble the YAML fragments into a single run config.
Using the Dagit config editor
Dagit provides a powerful, schema-aware, typeahead-enabled config editor to enable rapid
experimentation with and debugging of parameterized pipeline executions. As always, run:
```bash
dagit -f inputs.py
```
Notice that no execution plan appears in the bottom pane of the Playground.

Because Dagit is schema-aware, it knows that this pipeline now requires configuration in order to
run without errors. In this case, since the pipeline is relatively trivial, it wouldn't be
especially costly to run the pipeline and watch it fail. But when pipelines are complex and slow,
it's invaluable to get this kind of feedback up front rather than have an unexpected failure deep
inside a pipeline.
Recall that the execution plan, which you will ordinarily see above the log viewer in the
**Execute** tab, is the concrete pipeline that Dagster will actually execute. Without a valid
config, Dagster can't construct a parametrization of the logical pipeline—so no execution plan
is available for us to preview.
Press Ctrl + Space in order to bring up the typeahead assistant.

Here you can see all of the sections available in the run config. Don't worry, we'll get to them all
later.
Let's enter the config we need in order to execute our pipeline.

Note that as you type and edit the config, the config minimap hovering on the right side of the
editor pane changes to provide context—you always know where in the nested config schema
you are while making changes.
## Parametrizing Solids with Config
Solids often depend in predictable ways on features of the external world or the pipeline in which
-they're invoked. For example, consider an extended version of our CSV-reading solid that implements
-more of the options available in the underlying Python API:
-
-```python literalinclude showLines startLine=8 emphasize-lines=9-15 caption=config_bad_1.py
-file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_bad_1.py
-startAfter:start_config_bad_1_marker_0
-endBefore:end_config_bad_1_marker_0
-```
-
-We obviously don't want to have to write a separate solid for each permutation of these parameters
-that we use in our pipelines—especially because, in more realistic cases like configuring a
-Spark job or even parametrizing the `read_csv` function from a popular package like
-[Pandas](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html#pandas.read_csv),
-we might have dozens or hundreds of parameters like these.
+they're invoked. We obviously don't want to have to write a separate solid for each permutation of these parameters
+that we use in our pipelines— especially because, in more realistic cases like configuring a
+Spark job, we might have dozens or hundreds of parameters to configure.
But hoisting all of these parameters into the signature of the solid function as inputs isn't the
-right answer either:
-
-```python literalinclude showLines startLine=8 emphasize-lines=5-11 caption=config_bad_2.py
-file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_bad_2.py
-startAfter:start_config_bad_2_marker_0
-endBefore:end_config_bad_2_marker_0
-```
+right answer.
-Defaults are often sufficient for configuration values like these, and sets of parameters are often
+Defaults are often sufficient for configuration values, and sets of parameters are often
reusable. Moreover, it's unlikely that values like this will be provided dynamically by the outputs
of other solids in a pipeline.
Inputs, on the other hand, will usually be provided by the outputs of other solids in a pipeline,
even though we might sometimes want to stub them using the config facility.
For all these reasons, it's bad practice to mix configuration values like these with true input
values.
The solution is to define a config schema for our solid:
-```python literalinclude showLines startLine=2 emphasize-lines=17,35-43,90 caption=config.py
+```python literalinclude showLines startLine=16 emphasize-lines=1,6,9 caption=config.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py
startAfter:start_config_marker_0
endBefore:end_config_marker_0
```
-First, we pass the `config` argument to the decorator. This tells Dagster to give our solid a config field structured as
-a dictionary, whose keys are the keys of this argument, and the types of whose values are defined by
-the values of this argument (instances of ).
+We want to control the way our solid sorts the cereals.
+First, we pass the `config_schema` argument to the decorator.
+This tells Dagster to give our solid a config field structured as a dictionary.
+Then, we use the expected type of `reverse`, setting it to be a boolean.
+Finally, inside the body of the solid function `sort_by_calories`, we access the config value set by the user using the
+`solid_config` field on the familiar object.
-Then, we define one of these fields, `escapechar`, to be a string, setting a default value, making
-it optional, and setting a human-readable description.
+To execute this `config_pipeline`, we can specify the pipeline run config in the Python API:
-Finally, inside the body of the solid function, we access the config value set by the user using the
-`solid_config` field on the familiar object. When Dagster executes our
-pipeline, the framework will make validated config for each solid available on this object.
+```python literalinclude showLines startLine=60, emphasize-lines=4 caption=config.py
+file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py
+startAfter: start_config_marker_1
+endBefore: end_config_marker_1
+dedent:4
+```
-Let's see how all of this looks in Dagit. As usual, run:
+You can see that we've added a new section to the solid config. In addition to the `inputs` section for `read_csv`,
+we now have a `config` section for `sort_by_calories`, where we can set values defined in the `config_schema` argument
+to .
+
+We can also pass the run config in a YAML file to the Dagster CLI tool like below. Since we want to sort the cereals by calories in a reversed order
+and find the most caloric cereal, we set the reverse value to `True` in the `config` section.
```bash
-dagit -f config.py
+dagster pipeline execute -f config.py -c config_env.yaml
```
-
+```YAML literalinclude showLines caption=config_env.yaml
+file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml
+```
+In the console, we'll see the most caloric cereal is Mueslix Crispy Blend.
+
+```bash
+dagster - INFO - system - 0039ddb3-4b13-4d0c-8c01-4eb6fb1b90c6 - sort_by_calories.compute - Most caloric cereal: Mueslix Crispy Blend
+```
-As you may by now expect, Dagit provides a fully type-aware and schema-aware config editing
-environment with a typeahead. The human-readable descriptions we provided on our config fields
-appear in the config context minimap, as well as in typeahead tooltips and in the Explore pane when
-clicking into the individual solid definition.
+In this case, we have only defined one key value pair, `reverse` being the key. We can define more key value pairs like this.
+What's more, we can define the value as an object to make the config optional:
-
+```python literalinclude showLines startLine=18, caption=config_more_details.py
+file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py
+startAfter: start_details_marker_0
+endBefore: end_details_marker_0
+```
+
+Here, we can see the Dagster config type is equivalent to the ordinary `bool` type.
+The `default_value` and `is_required` are set to be `False`, and a human-readable description for this config is provided.
-You can see that we've added a new section to the solid config. In addition to the `inputs` section,
-which we'll still use to set the `csv_path` input, we now have a `config` section, where we can set
-values defined in the `config` argument to .
+Let's see how the config section looks in Dagit. As usual, run:
-```YAML literalinclude caption=config_env_bad.yaml
-file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml
+```bash
+dagit -f config.py
```
-Of course, this config won't give us the results we're expecting. The values in `cereal.csv` are
-comma-separated, not semicolon-separated, as they might be if this were a CSV from Europe, where
-commas are frequently used in place of the decimal point.
+
-We'll see later how we can use Dagster's facilities for automatic data quality checks to guard
-against semantic issues like this, which won't be caught by the type system.
+Dagit provides a fully type-aware and schema-aware config editing environment with a typeahead.
+The human-readable description of `reverse` we provided on our config fields
+appears in the config section on the right when we click into the `sort_by_calories` solid definition.
## Multiple and Conditional Outputs
Solids can have arbitrarily many outputs, and downstream solids can depend on any number of these.
What's more, solids can have outputs that are optional and don't have to be yielded, which lets us
write pipelines where some solids conditionally execute based on the presence of an upstream output.
Suppose we're interested in splitting hot and cold cereals into separate datasets and processing
them separately, based on config.
```python literalinclude showLines caption=multiple_outputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.py
startAfter:start_split_cereals
endBefore:end_split_cereals
```
Solids that yield multiple outputs must declare, and name, their outputs (passing `output_defs` to
the decorator). Output names must
be unique and each yielded by a solid's compute
function must have a name that corresponds to one of these declared outputs.
We'll define two downstream solids and hook them up to the multiple outputs from `split_cereals`.
```python literalinclude showLines caption=multiple_outputs.py
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.py
startAfter:# start-sort-and-pipeline
endBefore:# end-sort-and-pipeline
```
As usual, we can visualize this in Dagit:

Notice that the logical DAG corresponding to the pipeline definition includes both dependencies --
we won't know about the conditionality in the pipeline until runtime, when `split_cereal` might not
yield both of the outputs.

Zooming in, Dagit shows us the details of the multiple outputs from `split_cereals` and their
downstream dependencies, `hot_cereals` and `cold_cereals` .
When we execute this pipeline with the following config, we'll see that the cold cereals output is
omitted and that the execution step corresponding to the downstream solid is marked skipped in the
execution pane:
```YAML literalinclude showLines caption=multiple_outputs.yaml
file:/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/multiple_outputs.yaml
```

diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py
index bba4a4dfe..be43bca87 100644
--- a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py
+++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config.py
@@ -1,117 +1,69 @@
-# start_config_marker_0
import csv
import os
-from dagster import (
- Bool,
- Field,
- Int,
- String,
- execute_pipeline,
- pipeline,
- solid,
-)
+from dagster import execute_pipeline, pipeline, solid
-@solid(
- config_schema={
- "delimiter": Field(
- String,
- default_value=",",
- is_required=False,
- description=("A one-character string used to separate fields."),
- ),
- "doublequote": Field(
- Bool,
- default_value=False,
- is_required=False,
- description=(
- "Controls how instances of quotechar appearing inside a field "
- "should themselves be quoted. When True, the character is "
- "doubled. When False, the escapechar is used as a prefix to "
- "the quotechar."
- ),
- ),
- "escapechar": Field(
- String,
- default_value="\\",
- is_required=False,
- description=(
- "On reading, the escapechar removes any special meaning from "
- "the following character."
- ),
- ),
- "quotechar": Field(
- String,
- default_value='"',
- is_required=False,
- description=(
- "A one-character string used to quote fields containing "
- "special characters, such as the delimiter or quotechar, "
- "or which contain new-line characters."
- ),
- ),
- "quoting": Field(
- Int,
- default_value=csv.QUOTE_MINIMAL,
- is_required=False,
- description=(
- "Controls when quotes should be generated by the writer and "
- "recognised by the reader. It can take on any of the "
- "csv.QUOTE_* constants"
- ),
- ),
- "skipinitialspace": Field(
- Bool,
- default_value=False,
- is_required=False,
- description=(
- "When True, whitespace immediately following the delimiter "
- "is ignored. The default is False."
- ),
- ),
- "strict": Field(
- Bool,
- default_value=False,
- is_required=False,
- description=("When True, raise exception on bad CSV input."),
- ),
- }
-)
-def read_csv(context, csv_path: str):
+@solid
+def read_csv(context, csv_path):
csv_path = os.path.join(os.path.dirname(__file__), csv_path)
with open(csv_path, "r") as fd:
- lines = [
- row
- for row in csv.DictReader(
- fd,
- delimiter=context.solid_config["delimiter"],
- doublequote=context.solid_config["doublequote"],
- escapechar=context.solid_config["escapechar"],
- quotechar=context.solid_config["quotechar"],
- quoting=context.solid_config["quoting"],
- skipinitialspace=context.solid_config["skipinitialspace"],
- strict=context.solid_config["strict"],
- )
- ]
+ lines = [row for row in csv.DictReader(fd)]
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
-
return lines
-@pipeline
-def config_pipeline():
- read_csv()
+# start_config_marker_0
+@solid(config_schema={"reverse": bool})
+def sort_by_calories(context, cereals):
+ sorted_cereals = sorted(
+ cereals,
+ key=lambda cereal: int(cereal["calories"]),
+ reverse=context.solid_config["reverse"],
+ )
+
+ if context.solid_config["reverse"]: # find the most caloric cereal
+ context.log.info(
+ "{x} caloric cereal: {first_cereal_after_sort}".format(
+ x="Most", first_cereal_after_sort=sorted_cereals[0]["name"]
+ )
+ )
+ return {
+ "most_caloric": sorted_cereals[0],
+ "least_caloric": sorted_cereals[-1],
+ }
+ else: # find the least caloric cereal
+ context.log.info(
+ "{x} caloric cereal: {first_cereal_after_sort}".format(
+ x="Least", first_cereal_after_sort=sorted_cereals[0]["name"]
+ )
+ )
+ return {
+ "least_caloric": sorted_cereals[0],
+ "most_caloric": sorted_cereals[-1],
+ }
# end_config_marker_0
+
+@pipeline
+def config_pipeline():
+ sort_by_calories(read_csv())
+
+
if __name__ == "__main__":
+ # start_config_marker_1
run_config = {
"solids": {
- "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}
+ "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}},
+ "sort_by_calories": {"config": {"reverse": True}},
}
}
+ # end_config_marker_1
+
+ # start_config_marker_2
result = execute_pipeline(config_pipeline, run_config=run_config)
+ # end_config_marker_2
assert result.success
diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml
similarity index 69%
rename from examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml
rename to examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml
index 1e6df5787..4cbe49818 100644
--- a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env_bad.yaml
+++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_env.yaml
@@ -1,7 +1,8 @@
solids:
read_csv:
- config:
- delimiter: ";"
inputs:
csv_path:
value: "cereal.csv"
+ sort_by_calories:
+ config:
+ reverse: True
\ No newline at end of file
diff --git a/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py
new file mode 100644
index 000000000..e3eea82fa
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/intro_tutorial/basics/e02_solids/config_more_details.py
@@ -0,0 +1,69 @@
+import csv
+import os
+
+from dagster import Bool, Field, execute_pipeline, pipeline, solid
+
+
+@solid
+def read_csv(context, csv_path):
+ csv_path = os.path.join(os.path.dirname(__file__), csv_path)
+ with open(csv_path, "r") as fd:
+ lines = [row for row in csv.DictReader(fd)]
+
+ context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
+ return lines
+
+
+# start_details_marker_0
+@solid(
+ config_schema={
+ "reverse": Field(
+ Bool,
+ default_value=False,
+ is_required=False,
+ description="If `True`, cereals will be sorted in reverse order. Default: `False`",
+ )
+ }
+)
+# end_details_marker_0
+def sort_by_calories(context, cereals):
+ sorted_cereals = sorted(
+ cereals,
+ key=lambda cereal: int(cereal["calories"]),
+ reverse=context.solid_config["reverse"],
+ )
+
+ if context.solid_config["reverse"]: # find the most caloric cereal
+ context.log.info(
+ "{x} caloric cereal: {first_cereal_after_sort}".format(
+ x="Most", first_cereal_after_sort=sorted_cereals[0]["name"]
+ )
+ )
+ else: # find the least caloric cereal
+ context.log.info(
+ "{x} caloric cereal: {first_cereal_after_sort}".format(
+ x="Least", first_cereal_after_sort=sorted_cereals[0]["name"]
+ )
+ )
+
+ return {
+ "least_caloric": sorted_cereals[0],
+ "most_caloric": sorted_cereals[-1],
+ }
+
+
+@pipeline
+def config_pipeline():
+ sort_by_calories(read_csv())
+
+
+if __name__ == "__main__":
+ run_config = {
+ "solids": {
+ "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}},
+ "sort_by_calories": {"config": {"reverse": True}},
+ }
+ }
+ result = execute_pipeline(config_pipeline, run_config=run_config)
+
+ assert result.success
diff --git a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py
new file mode 100644
index 000000000..cafa44ae0
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/basics/test_config.py
@@ -0,0 +1,66 @@
+from collections import OrderedDict
+
+from docs_snippets.intro_tutorial.basics.e02_solids.config import config_pipeline
+
+from dagster import execute_pipeline
+from dagster.utils import pushd, script_relative_path
+
+
+def test_tutorial_config_schema():
+ with pushd(script_relative_path("../../../docs_snippets/intro_tutorial/basics/e02_solids/")):
+ result = execute_pipeline(
+ config_pipeline,
+ run_config={
+ "solids": {
+ "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}},
+ "sort_by_calories": {"config": {"reverse": False}},
+ }
+ },
+ )
+
+ assert result.success
+ assert len(result.solid_result_list) == 2
+ assert isinstance(result.result_for_solid("sort_by_calories").output_value(), dict)
+ assert result.result_for_solid("sort_by_calories").output_value() == {
+ "least_caloric": OrderedDict(
+ [
+ ("name", "All-Bran with Extra Fiber"),
+ ("mfr", "K"),
+ ("type", "C"),
+ ("calories", "50"),
+ ("protein", "4"),
+ ("fat", "0"),
+ ("sodium", "140"),
+ ("fiber", "14"),
+ ("carbo", "8"),
+ ("sugars", "0"),
+ ("potass", "330"),
+ ("vitamins", "25"),
+ ("shelf", "3"),
+ ("weight", "1"),
+ ("cups", "0.5"),
+ ("rating", "93.704912"),
+ ]
+ ),
+ "most_caloric": OrderedDict(
+ [
+ ("name", "Mueslix Crispy Blend"),
+ ("mfr", "K"),
+ ("type", "C"),
+ ("calories", "160"),
+ ("protein", "3"),
+ ("fat", "2"),
+ ("sodium", "150"),
+ ("fiber", "3"),
+ ("carbo", "17"),
+ ("sugars", "13"),
+ ("potass", "160"),
+ ("vitamins", "25"),
+ ("shelf", "3"),
+ ("weight", "1.5"),
+ ("cups", "0.67"),
+ ("rating", "30.313351"),
+ ]
+ ),
+ }
+ return result
diff --git a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py
index 1e710efcc..c95b68fec 100644
--- a/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py
+++ b/examples/docs_snippets/docs_snippets_tests/intro_tutorial_tests/test_cli_invocations.py
@@ -1,363 +1,353 @@
import json
import os
import runpy
import pytest
from click.testing import CliRunner
from dagit.app import create_app_from_workspace
from dagster.cli.pipeline import pipeline_execute_command
from dagster.cli.workspace import get_workspace_from_kwargs
from dagster.core.instance import DagsterInstance
from dagster.core.test_utils import instance_for_test
from dagster.utils import (
DEFAULT_REPOSITORY_YAML_FILENAME,
check_script,
pushd,
script_relative_path,
)
PIPELINES_OR_ERROR_QUERY = """
{
repositoriesOrError {
... on PythonError {
message
stack
}
... on RepositoryConnection {
nodes {
pipelines {
name
}
}
}
}
}
"""
cli_args = [
# dirname, filename, fn_name, env_yaml, mode, preset, return_code, exception
(
"basics/e01_first_pipeline/",
"hello_cereal.py",
"hello_cereal_pipeline",
None,
None,
None,
0,
None,
),
("basics/e02_solids/", "inputs.py", "inputs_pipeline", "inputs_env.yaml", None, None, 0, None),
(
"basics/e02_solids/",
"config_bad_1.py",
"config_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"basics/e02_solids/",
"config_bad_2.py",
"config_pipeline",
"config_bad_2.yaml",
None,
None,
0,
None,
),
- ("basics/e02_solids/", "config.py", "config_pipeline", "inputs_env.yaml", None, None, 0, None),
- (
- "basics/e02_solids/",
- "config.py",
- "config_pipeline",
- "config_env_bad.yaml",
- None,
- None,
- 0,
- None,
- ),
+ ("basics/e02_solids/", "config.py", "config_pipeline", "config_env.yaml", None, None, 0, None,),
(
"basics/e02_solids/",
"multiple_outputs.py",
"multiple_outputs_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
("basics/e03_pipelines/", "serial_pipeline.py", "serial_pipeline", None, None, None, 0, None),
("basics/e03_pipelines/", "complex_pipeline.py", "complex_pipeline", None, None, None, 0, None),
(
"basics/e04_quality/",
"inputs_typed.py",
"inputs_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types.py",
"custom_type_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types_2.py",
"custom_type_pipeline",
"custom_types_2.yaml",
None,
None,
1,
Exception,
),
(
"basics/e04_quality/",
"custom_types_3.py",
"custom_type_pipeline",
"custom_type_input.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types_4.py",
"custom_type_pipeline",
"custom_type_input.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types_5.py",
"custom_type_pipeline",
"custom_type_input.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types_mypy_verbose.py",
"custom_type_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"basics/e04_quality/",
"custom_types_mypy_typing_trick.py",
"custom_type_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"advanced/solids/",
"reusable_solids.py",
"reusable_solids_pipeline",
"reusable_solids.yaml",
None,
None,
0,
None,
),
(
"advanced/solids/",
"composite_solids.py",
"composite_solids_pipeline",
"composite_solids.yaml",
None,
None,
0,
None,
),
(
"advanced/pipelines/",
"resources.py",
"resources_pipeline",
"resources.yaml",
None,
None,
0,
None,
),
(
"advanced/pipelines/",
"required_resources.py",
"resources_pipeline",
"resources.yaml",
None,
None,
0,
None,
),
(
"advanced/pipelines/",
"modes.py",
"modes_pipeline",
"resources.yaml",
"unittest",
None,
0,
None,
),
("advanced/pipelines/", "presets.py", "presets_pipeline", None, None, "unittest", 0, None),
(
"advanced/materializations/",
"materializations.py",
"materialization_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"advanced/materializations/",
"output_materialization.py",
"output_materialization_pipeline",
"output_materialization.yaml",
None,
None,
0,
None,
),
(
"advanced/intermediates/",
"serialization_strategy.py",
"serialization_strategy_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
(
"advanced/scheduling/",
"scheduler.py",
"hello_cereal_pipeline",
"inputs_env.yaml",
None,
None,
0,
None,
),
]
def path_to_tutorial_file(path):
return script_relative_path(os.path.join("../../docs_snippets/intro_tutorial/", path))
def load_dagit_for_workspace_cli_args(n_pipelines=1, **kwargs):
instance = DagsterInstance.ephemeral()
with get_workspace_from_kwargs(kwargs, instance) as workspace:
app = create_app_from_workspace(workspace, instance)
client = app.test_client()
res = client.get(
"/graphql?query={query_string}".format(query_string=PIPELINES_OR_ERROR_QUERY)
)
json_res = json.loads(res.data.decode("utf-8"))
assert "data" in json_res
assert "repositoriesOrError" in json_res["data"]
assert "nodes" in json_res["data"]["repositoriesOrError"]
assert len(json_res["data"]["repositoriesOrError"]["nodes"][0]["pipelines"]) == n_pipelines
return res
def dagster_pipeline_execute(args, return_code):
with instance_for_test():
runner = CliRunner()
res = runner.invoke(pipeline_execute_command, args)
assert res.exit_code == return_code, res.exception
return res
@pytest.mark.parametrize(
"dirname,filename,fn_name,_env_yaml,_mode,_preset,_return_code,_exception", cli_args
)
# dagit -f filename -n fn_name
def test_load_pipeline(
dirname, filename, fn_name, _env_yaml, _mode, _preset, _return_code, _exception
):
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
load_dagit_for_workspace_cli_args(python_file=filepath, fn_name=fn_name)
@pytest.mark.parametrize(
"dirname,filename,fn_name,env_yaml,mode,preset,return_code,_exception", cli_args
)
# dagster pipeline execute -f filename -n fn_name -e env_yaml --preset preset
def test_dagster_pipeline_execute(
dirname, filename, fn_name, env_yaml, mode, preset, return_code, _exception
):
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
yamlpath = path_to_tutorial_file(os.path.join(dirname, env_yaml)) if env_yaml else None
dagster_pipeline_execute(
["-f", filepath, "-a", fn_name]
+ (["-c", yamlpath] if yamlpath else [])
+ (["--mode", mode] if mode else [])
+ (["--preset", preset] if preset else []),
return_code,
)
@pytest.mark.parametrize(
"dirname,filename,_fn_name,_env_yaml,_mode,_preset,return_code,_exception", cli_args
)
def test_script(dirname, filename, _fn_name, _env_yaml, _mode, _preset, return_code, _exception):
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
check_script(filepath, return_code)
@pytest.mark.parametrize(
"dirname,filename,_fn_name,_env_yaml,_mode,_preset,_return_code,exception", cli_args
)
def test_runpy(dirname, filename, _fn_name, _env_yaml, _mode, _preset, _return_code, exception):
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
if exception:
with pytest.raises(exception):
runpy.run_path(filepath, run_name="__main__")
else:
runpy.run_path(filepath, run_name="__main__")
# TODO python command line
# dagit
def test_load_repo():
load_dagit_for_workspace_cli_args(
n_pipelines=2,
repository_yaml=path_to_tutorial_file(
os.path.join("advanced/repositories/", DEFAULT_REPOSITORY_YAML_FILENAME)
),
)