diff --git a/python_modules/dagster/dagster/core/definitions/intermediate_storage.py b/python_modules/dagster/dagster/core/definitions/intermediate_storage.py --- a/python_modules/dagster/dagster/core/definitions/intermediate_storage.py +++ b/python_modules/dagster/dagster/core/definitions/intermediate_storage.py @@ -2,12 +2,17 @@ from dagster import check from dagster.core.definitions.configurable import ConfigurableDefinition +from dagster.core.definitions.resource_mappable import ( + ResourceKeyMapping, + ResourceMappableDefinition, + vend_new_key, +) from .definition_config_schema import convert_user_facing_definition_config_schema from .utils import check_valid_name -class IntermediateStorageDefinition(ConfigurableDefinition): +class IntermediateStorageDefinition(ConfigurableDefinition, ResourceMappableDefinition): """Defines intermediate data storage behaviors. Args: @@ -39,11 +44,11 @@ self._intermediate_storage_creation_fn = check.opt_callable_param( intermediate_storage_creation_fn, "intermediate_storage_creation_fn" ) - self._required_resource_keys = frozenset( + self._resource_key_mappings = frozenset( check.set_param( required_resource_keys if required_resource_keys else set(), "required_resource_keys", - of_type=str, + of_type=(ResourceKeyMapping, str), ) ) self._description = check.opt_str_param(description, "description") @@ -70,7 +75,11 @@ @property def required_resource_keys(self): - return self._required_resource_keys + return frozenset([vend_new_key(key_or_map) for key_or_map in self._resource_key_mappings]) + + @property + def resource_key_mappings(self): + return frozenset(self._resource_key_mappings) def copy_for_configured(self, name, description, config_schema, _): return IntermediateStorageDefinition( @@ -82,6 +91,21 @@ description=description or self.description, ) + def remap_resource_key(self, resource_key_mappings, name=None): + + mapped_resource_keys = self.get_mapped_resource_keys(resource_key_mappings) + + name = check.str_param(name, "name") + + return IntermediateStorageDefinition( + name=name, + is_persistent=self.is_persistent, + required_resource_keys=mapped_resource_keys, + config_schema=self.config_schema, + intermediate_storage_creation_fn=self.intermediate_storage_creation_fn, + description=self.description, + ) + def intermediate_storage( required_resource_keys=None, name=None, is_persistent=True, config_schema=None diff --git a/python_modules/dagster/dagster/core/definitions/resource.py b/python_modules/dagster/dagster/core/definitions/resource.py --- a/python_modules/dagster/dagster/core/definitions/resource.py +++ b/python_modules/dagster/dagster/core/definitions/resource.py @@ -4,6 +4,11 @@ from dagster import check, seven from dagster.core.definitions.config import is_callable_valid_config_arg from dagster.core.definitions.configurable import ConfigurableDefinition +from dagster.core.definitions.resource_mappable import ( + ResourceKeyMapping, + vend_new_key, + vend_old_key, +) from dagster.core.errors import DagsterInvalidDefinitionError, DagsterUnknownResourceError from dagster.utils.backcompat import experimental_arg_warning @@ -243,15 +248,21 @@ as, e.g., context.resources.foo. """ required_resource_keys = check.opt_set_param( - required_resource_keys, "required_resource_keys", of_type=str + required_resource_keys, "required_resource_keys", of_type=(str, ResourceKeyMapping) ) + + original_names = { + vend_new_key(key_or_map): vend_old_key(key_or_map) + for key_or_map in required_resource_keys + } # Map of original key to key which will be contained in self.resource_instance_dict. + # it is possible that the surrounding context does NOT have the required resource keys # because we are building a context for steps that we are not going to execute (e.g. in the # resume/retry case, in order to generate copy intermediates events) resource_instance_dict = { - key: self.resource_instance_dict[key] + original_names[vend_new_key(key)]: self.resource_instance_dict[vend_new_key(key)] for key in required_resource_keys - if key in self.resource_instance_dict + if vend_new_key(key) in self.resource_instance_dict } class ScopedResources(namedtuple("Resources", list(resource_instance_dict.keys()))): diff --git a/python_modules/dagster/dagster/core/definitions/resource_mappable.py b/python_modules/dagster/dagster/core/definitions/resource_mappable.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/definitions/resource_mappable.py @@ -0,0 +1,133 @@ +from abc import ABC, abstractclassmethod, abstractproperty + +from dagster import check + + +class ResourceMappableDefinition(ABC): + @abstractproperty + def required_resource_keys(self): + """ + Returns the set of resource keys that this definition will be expecting a resource for. + + If an old key has been mapped to a new key, then in the mapped definition, only the new key + would appear in the set of resource keys returned. + + Returns: + FrozenSet[str] + """ + raise NotImplementedError() + + @abstractproperty + def resource_key_mappings(self): + """ + Returns the underlying mappings that exist on this definition. If a key has been + mapped, it will be represented with a ResourceKeyMapping object, if it is original, it will + be a string. + + For example, consider a definition that originally has keys `{"foo", "bar"}`, and `"bar"` has + been mapped to `"bar2"`. The expected output of this function would be the set + `{"foo", ResourceKeyMapping(orig_key="bar", new_key="bar2")}`. + + Returns: + FrozenSet[Union[str, ResourceKeyMapping]] + """ + raise NotImplementedError() + + @abstractclassmethod + def remap_resource_key(self, resource_key_mappings, name=None): + """ + Applies provided resource key mappings to a new definition and returns that definition. + + Expects incoming resource mappings to map a key from this definition to a new key. The new + definition will expect resources to be passed in with the new key. + + + Args: + resource_key_mappings (Dict[str, str]): Dictionary that represents key mappings. Maps an + original key that is one of the `required_resource_keys` for this definition to a + new_key which will take its place. + + Returns: + ResourceMappableDefinition: A copy of this definition where resource keys have been + replaced according to the provided mappings. + + **Examples** + + .. code-block:: python + + @solid(required_resource_keys={"foo"}) + def bar_solid(context): + return context.resources.foo + + @resource + def not_called_foo(_): + return "Not Foo!" + + @pipeline(mode_defs=[ModeDefinition(resource_defs={"not_foo": not_called_foo})]) + def bar_pipeline(): + return bar_solid.remap_resource_key({"foo", "not_foo"})() + + """ + raise NotImplementedError() + + def get_mapped_resource_keys(self, resource_key_mappings): + resource_key_mappings = check.dict_param( + resource_key_mappings, "resource_key_mappings", key_type=str, value_type=str + ) + + check.invariant( + all( + [ + resource_key in self.required_resource_keys + for resource_key in resource_key_mappings.keys() + ] + ), + "Check that for each mapping passed in, the original key already exists in the set of " + "resource keys.", + ) + + mapped_resource_keys = { + vend_new_key(key_or_map) + if vend_new_key(key_or_map) not in resource_key_mappings + else ResourceKeyMapping( + vend_old_key(key_or_map), resource_key_mappings[vend_new_key(key_or_map)] + ) + for key_or_map in self.resource_key_mappings + } + return mapped_resource_keys + + +class ResourceKeyMapping: + def __init__(self, orig_key, new_key): + self._orig_key = check.str_param(orig_key, "orig_key") + self._new_key = check.str_param(new_key, "new_key") + + @property + def orig_key(self): + return self._orig_key + + @property + def new_key(self): + return self._new_key + + def __eq__(self, other): + if not isinstance(other, ResourceKeyMapping): + return False + return self.orig_key == other.orig_key and self.new_key == other.new_key + + def __hash__(self): + return hash((self.orig_key, self.new_key)) + + +def vend_new_key(str_or_resource_mapping): + if isinstance(str_or_resource_mapping, ResourceKeyMapping): + return str_or_resource_mapping.new_key + else: + return str_or_resource_mapping + + +def vend_old_key(str_or_resource_mapping): + if isinstance(str_or_resource_mapping, ResourceKeyMapping): + return str_or_resource_mapping.orig_key + else: + return str_or_resource_mapping diff --git a/python_modules/dagster/dagster/core/definitions/solid.py b/python_modules/dagster/dagster/core/definitions/solid.py --- a/python_modules/dagster/dagster/core/definitions/solid.py +++ b/python_modules/dagster/dagster/core/definitions/solid.py @@ -8,9 +8,10 @@ from .i_solid_definition import NodeDefinition from .input import InputDefinition from .output import OutputDefinition +from .resource_mappable import ResourceKeyMapping, ResourceMappableDefinition, vend_new_key -class SolidDefinition(NodeDefinition): +class SolidDefinition(NodeDefinition, ResourceMappableDefinition): """ The definition of a Solid that performs a user-defined computation. @@ -78,8 +79,8 @@ ): self._compute_fn = check.callable_param(compute_fn, "compute_fn") self._config_schema = convert_user_facing_definition_config_schema(config_schema) - self._required_resource_keys = frozenset( - check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) + self._resource_key_mappings = check.opt_set_param( + required_resource_keys, "required_resource_keys", of_type=(ResourceKeyMapping, str), ) self._version = check.opt_str_param(version, "version") if version: @@ -104,7 +105,11 @@ @property def required_resource_keys(self): - return frozenset(self._required_resource_keys) + return frozenset([vend_new_key(key_or_map) for key_or_map in self._resource_key_mappings]) + + @property + def resource_key_mappings(self): + return frozenset(self._resource_key_mappings) @property def has_config_entry(self): @@ -143,6 +148,25 @@ version=self.version, ) + def remap_resource_key(self, resource_key_mappings, name=None): + + mapped_resource_keys = self.get_mapped_resource_keys(resource_key_mappings) + + name = check.str_param(name, "name") + + return SolidDefinition( + name=name, + input_defs=self._input_defs, + compute_fn=self.compute_fn, + output_defs=self.output_defs, + config_schema=self.config_schema, + description=self.description, + tags=self.tags, + required_resource_keys=mapped_resource_keys, + positional_inputs=self.positional_inputs, + version=self.version, + ) + class CompositeSolidDefinition(GraphDefinition): """The core unit of composition and abstraction, composite solids allow you to diff --git a/python_modules/dagster/dagster/core/execution/context/compute.py b/python_modules/dagster/dagster/core/execution/context/compute.py --- a/python_modules/dagster/dagster/core/execution/context/compute.py +++ b/python_modules/dagster/dagster/core/execution/context/compute.py @@ -56,6 +56,12 @@ SystemComputeExecutionContext, ) self._pdb = None + resource_key_mappings = system_compute_execution_context.solid_def.resource_key_mappings + self._resource_builder = ( + system_compute_execution_context.execution_context_data.scoped_resources_builder + ) + self._resources = self._resource_builder.build(resource_key_mappings) + super(SolidExecutionContext, self).__init__(system_compute_execution_context) @property @@ -90,3 +96,7 @@ self._pdb = ForkedPdb() return self._pdb + + @property + def resources(self): + return self._resources diff --git a/python_modules/dagster/dagster/core/execution/context/system.py b/python_modules/dagster/dagster/core/execution/context/system.py --- a/python_modules/dagster/dagster/core/execution/context/system.py +++ b/python_modules/dagster/dagster/core/execution/context/system.py @@ -276,6 +276,10 @@ def log(self): return self._log_manager + @property + def execution_context_data(self): + return self._execution_context_data + def for_hook(self, hook_def): return HookContext(self._execution_context_data, self.log, hook_def, self.step) diff --git a/python_modules/dagster/dagster/core/execution/context_creation_pipeline.py b/python_modules/dagster/dagster/core/execution/context_creation_pipeline.py --- a/python_modules/dagster/dagster/core/execution/context_creation_pipeline.py +++ b/python_modules/dagster/dagster/core/execution/context_creation_pipeline.py @@ -353,7 +353,7 @@ pipeline_def, intermediate_storage_def ), resources=scoped_resources_builder.build( - context_creation_data.intermediate_storage_def.required_resource_keys, + context_creation_data.intermediate_storage_def.resource_key_mappings, ), ) ) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_definition.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_definition.py --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_definition.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_definition.py @@ -14,6 +14,7 @@ configured, execute_pipeline, execute_pipeline_iterator, + pipeline, reconstructable, resource, seven, @@ -325,13 +326,13 @@ assert context.resources.test_null is None called["yup"] = True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_none_resource", solid_defs=[solid_test_null], mode_defs=[ModeDefinition(resource_defs={"test_null": ResourceDefinition.none_resource()})], ) - result = execute_pipeline(pipeline) + result = execute_pipeline(pipeline_def) assert result.success assert called["yup"] @@ -345,7 +346,7 @@ assert context.resources.test_string == "foo" called["yup"] = True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_string_resource", solid_defs=[solid_test_string], mode_defs=[ @@ -353,7 +354,7 @@ ], ) - result = execute_pipeline(pipeline, {"resources": {"test_string": {"config": "foo"}}}) + result = execute_pipeline(pipeline_def, {"resources": {"test_string": {"config": "foo"}}}) assert result.success assert called["yup"] @@ -369,7 +370,7 @@ assert context.resources.hardcoded("called") called["yup"] = True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="hardcoded_resource", solid_defs=[solid_hardcoded], mode_defs=[ @@ -379,7 +380,7 @@ ], ) - result = execute_pipeline(pipeline) + result = execute_pipeline(pipeline_def) assert result.success assert called["yup"] @@ -394,13 +395,13 @@ assert context.resources.test_mock is not None called["yup"] = True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_mock_resource", solid_defs=[solid_test_mock], mode_defs=[ModeDefinition(resource_defs={"test_mock": ResourceDefinition.mock_resource()})], ) - result = execute_pipeline(pipeline) + result = execute_pipeline(pipeline_def) assert result.success assert called["yup"] @@ -419,13 +420,13 @@ called["solid"] = True assert context.resources.return_thing == "thing" - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_no_config_resource", solid_defs=[check_thing], mode_defs=[ModeDefinition(resource_defs={"return_thing": return_thing})], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called["resource"] assert called["solid"] @@ -444,13 +445,13 @@ called["solid"] = True assert context.resources.return_thing == "thing" - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_no_config_resource", solid_defs=[check_thing], mode_defs=[ModeDefinition(resource_defs={"return_thing": return_thing})], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called["resource"] assert called["solid"] @@ -469,13 +470,13 @@ called["solid"] = True assert context.resources.return_thing == "thing" - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_no_config_resource", solid_defs=[check_thing], mode_defs=[ModeDefinition(resource_defs={"return_thing": return_thing})], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called["resource"] assert called["solid"] @@ -493,7 +494,7 @@ called["solid"] = True assert context.resources.return_thing == "thing" - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_no_config_resource", solid_defs=[check_thing], mode_defs=[ @@ -503,7 +504,7 @@ ], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called["resource"] assert called["solid"] @@ -522,7 +523,7 @@ called["solid"] = True assert context.resources.resource_with_cleanup is True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_resource_cleanup", solid_defs=[check_resource_created], mode_defs=[ @@ -532,7 +533,7 @@ ], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called["creation"] is True assert called["solid"] is True @@ -558,7 +559,7 @@ assert context.resources.resource_with_cleanup_1 is True assert context.resources.resource_with_cleanup_2 is True - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_resource_cleanup", solid_defs=[check_resource_created], mode_defs=[ @@ -571,7 +572,7 @@ ], ) - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called == ["creation_1", "creation_2", "solid", "cleanup_2", "cleanup_1"] @@ -602,20 +603,20 @@ def failing_resource_solid(_context): pass - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_resource_init_failure", solid_defs=[failing_resource_solid], mode_defs=[ModeDefinition(resource_defs={"failing_resource": failing_resource})], ) - res = execute_pipeline(pipeline, raise_on_error=False) + res = execute_pipeline(pipeline_def, raise_on_error=False) event_types = [event.event_type_value for event in res.event_list] assert DagsterEventType.PIPELINE_INIT_FAILURE.value in event_types instance = DagsterInstance.ephemeral() - execution_plan = create_execution_plan(pipeline) - pipeline_run = instance.create_run_for_pipeline(pipeline, execution_plan=execution_plan) + execution_plan = create_execution_plan(pipeline_def) + pipeline_run = instance.create_run_for_pipeline(pipeline_def, execution_plan=execution_plan) step_events = execute_plan(execution_plan, pipeline_run=pipeline_run, instance=instance) @@ -625,7 +626,7 @@ # Test the pipeline init failure event fires even if we are raising errors events = [] try: - for event in execute_pipeline_iterator(pipeline): + for event in execute_pipeline_iterator(pipeline_def): events.append(event) except DagsterResourceFunctionError: pass @@ -673,13 +674,13 @@ def resource_solid(_): pass - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_resource_init_failure_with_cleanup", solid_defs=[resource_solid], mode_defs=[ModeDefinition(resource_defs={"a": resource_a, "b": resource_b})], ) - res = execute_pipeline(pipeline, raise_on_error=False) + res = execute_pipeline(pipeline_def, raise_on_error=False) event_types = [event.event_type_value for event in res.event_list] assert DagsterEventType.PIPELINE_INIT_FAILURE.value in event_types @@ -691,7 +692,7 @@ events = [] try: - for event in execute_pipeline_iterator(pipeline): + for event in execute_pipeline_iterator(pipeline_def): events.append(event) except DagsterResourceFunctionError: pass @@ -726,13 +727,13 @@ def resource_solid(_): raise Exception("uh oh") - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_solid_failure_resource_teardown", solid_defs=[resource_solid], mode_defs=[ModeDefinition(resource_defs={"a": resource_a, "b": resource_b})], ) - res = execute_pipeline(pipeline, raise_on_error=False) + res = execute_pipeline(pipeline_def, raise_on_error=False) assert res.event_list[-1].event_type_value == "PIPELINE_FAILURE" assert called == ["A", "B"] assert cleaned == ["B", "A"] @@ -742,7 +743,7 @@ events = [] try: - for event in execute_pipeline_iterator(pipeline): + for event in execute_pipeline_iterator(pipeline_def): events.append(event) except DagsterResourceFunctionError: pass @@ -778,14 +779,14 @@ def resource_solid(_): raise Exception("uh oh") - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_solid_failure_resource_teardown", solid_defs=[resource_solid], mode_defs=[ModeDefinition(resource_defs={"a": resource_a, "b": resource_b})], ) with pytest.raises(Exception): - execute_pipeline(pipeline) + execute_pipeline(pipeline_def) assert called == ["A", "B"] assert cleaned == ["B", "A"] @@ -819,13 +820,13 @@ def resource_solid(_): pass - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="test_resource_teardown_failure", solid_defs=[resource_solid], mode_defs=[ModeDefinition(resource_defs={"a": resource_a, "b": resource_b})], ) - result = execute_pipeline(pipeline, raise_on_error=False) + result = execute_pipeline(pipeline_def, raise_on_error=False) assert result.success error_events = [ event @@ -840,7 +841,7 @@ cleaned = [] events = [] try: - for event in execute_pipeline_iterator(pipeline): + for event in execute_pipeline_iterator(pipeline_def): events.append(event) except DagsterResourceFunctionError: pass @@ -877,9 +878,9 @@ def test_multiprocessing_resource_teardown_failure(): with instance_for_test() as instance: - pipeline = reconstructable(define_resource_teardown_failure_pipeline) + pipeline_def = reconstructable(define_resource_teardown_failure_pipeline) result = execute_pipeline( - pipeline, + pipeline_def, run_config={ "intermediate_storage": {"filesystem": {}}, "execution": {"multiprocess": {}}, @@ -917,7 +918,7 @@ context.log.info(USER_RESOURCE_MESSAGE) return "A" - pipeline = PipelineDefinition( + pipeline_def = PipelineDefinition( name="resource_logging_pipeline", solid_defs=[resource_solid], mode_defs=[ @@ -930,12 +931,12 @@ with instance_for_test() as instance: pipeline_run = instance.create_run_for_pipeline( - pipeline, + pipeline_def, run_config={"loggers": {"callback": {}}}, step_keys_to_execute=["resource_solid.compute"], ) - result = execute_run(InMemoryPipeline(pipeline), pipeline_run, instance) + result = execute_run(InMemoryPipeline(pipeline_def), pipeline_run, instance) assert result.success log_messages = [event for event in events if isinstance(event, LogMessageRecord)] @@ -1044,3 +1045,67 @@ assert_pipeline_runs_with_resource( passthrough_to_enum_resource, {"enum": "VALUE_ONE"}, TestPythonEnum.VALUE_ONE ) + + +def test_resource_remapping(): + # simple remapping case where I have only one solid, one resource, and I want to remap the name + # of the resource. + @solid(required_resource_keys={"foo1"}) + def solid1(context): + return context.resources.foo1 + + @resource + def bar1(_): + return "fish" + + @pipeline(mode_defs=[ModeDefinition(resource_defs={"bar1": bar1})]) + def remap_pipeline(): + return solid1.remap_resource_key({"foo1": "bar1"}, name="woah")() + + result = execute_pipeline(remap_pipeline) + + assert result.success + + +def test_resource_remapping_name_collisions(): + # remapping case where we call the same solid with two different resources. + @solid(required_resource_keys={"animal_resource"}) + def return_animal(context): + return context.resources.animal_resource + + @resource + def fish(_): + return "fish" + + @resource + def mammal(_): + return "mammal" + + @resource + def amphibian(_): + return "amphibian" + + @pipeline( + mode_defs=[ + ModeDefinition( + resource_defs={ + "animal_resource": amphibian, + "animal_resource_1": fish, + "animal_resource_2": mammal, + } + ) + ] + ) + def animal_pipeline(): + return ( + return_animal(), + return_animal.remap_resource_key( + {"animal_resource": "animal_resource_1"}, name="fish_solid" + )(), + return_animal.remap_resource_key( + {"animal_resource": "animal_resource_2"}, name="mammal_solid" + )(), + ) + + result = execute_pipeline(animal_pipeline) + assert result.success