diff --git a/python_modules/dagster/dagster/core/definitions/step_launcher.py b/python_modules/dagster/dagster/core/definitions/step_launcher.py --- a/python_modules/dagster/dagster/core/definitions/step_launcher.py +++ b/python_modules/dagster/dagster/core/definitions/step_launcher.py @@ -4,6 +4,7 @@ import six from dagster import check from dagster.core.definitions.reconstructable import ReconstructablePipeline +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.retries import Retries from dagster.core.storage.pipeline_run import PipelineRun @@ -35,7 +36,7 @@ check.inst_param(pipeline_run, "pipeline_run", PipelineRun), check.str_param(run_id, "run_id"), check.inst_param(retries, "retries", Retries), - check.str_param(step_key, "step_key"), + check.inst_param(step_key, "step_key", StepKey), check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline), check.int_param(prior_attempts_count, "prior_attempts_count"), ) diff --git a/python_modules/dagster/dagster/core/events/__init__.py b/python_modules/dagster/dagster/core/events/__init__.py --- a/python_modules/dagster/dagster/core/events/__init__.py +++ b/python_modules/dagster/dagster/core/events/__init__.py @@ -19,6 +19,7 @@ SystemExecutionContext, SystemStepExecutionContext, ) +from dagster.core.execution.plan.key import check_opt_step_key_param, check_step_key_param from dagster.core.execution.plan.objects import StepOutputData from dagster.core.log_manager import DagsterLogManager from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes @@ -168,7 +169,9 @@ check.inst(event.event_specific_data, EngineEventData) log_fn = log_manager.error if event.event_specific_data.error else log_manager.debug - log_fn(event.message, dagster_event=event, pipeline_name=pipeline_name, step_key=event.step_key) + log_fn( + event.message, dagster_event=event, pipeline_name=pipeline_name, + ) @whitelist_for_serdes @@ -201,14 +204,14 @@ check.inst_param(step_context, "step_context", SystemStepExecutionContext) event = DagsterEvent( - check.inst_param(event_type, "event_type", DagsterEventType).value, - step_context.pipeline_def.name, - step_context.step.key, - step_context.step.solid_handle, - step_context.step.kind.value, - step_context.logging_tags, - _validate_event_specific_data(event_type, event_specific_data), - check.opt_str_param(message, "message"), + event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, + pipeline_name=step_context.pipeline_def.name, + step_key=str(step_context.step.key), + solid_handle=step_context.step.solid_handle, + step_kind_value=step_context.step.kind.value, + logging_tags=step_context.logging_tags, + event_specific_data=_validate_event_specific_data(event_type, event_specific_data), + message=check.opt_str_param(message, "message"), pid=os.getpid(), ) @@ -221,7 +224,7 @@ event_type, pipeline_context, message=None, event_specific_data=None, step_key=None ): check.inst_param(pipeline_context, "pipeline_context", SystemExecutionContext) - + check.opt_str_param(step_key, "step_key") pipeline_name = pipeline_context.pipeline_def.name event = DagsterEvent( @@ -243,6 +246,7 @@ check.inst_param(execution_plan, "execution_plan", ExecutionPlan) pipeline_name = execution_plan.pipeline_def.name + single_step_key = execution_plan.step_key_for_single_step_plans() event = DagsterEvent( DagsterEventType.ENGINE_EVENT.value, pipeline_name=pipeline_name, @@ -250,7 +254,7 @@ event_specific_data=_validate_event_specific_data( DagsterEventType.ENGINE_EVENT, event_specific_data ), - step_key=execution_plan.step_key_for_single_step_plans(), + step_key=str(single_step_key) if single_step_key else None, pid=os.getpid(), ) log_resource_event(log_manager, pipeline_name, event) @@ -719,6 +723,7 @@ @staticmethod def engine_event(pipeline_context, message, event_specific_data=None, step_key=None): + check.opt_str_param(step_key, "step_key") return DagsterEvent.from_pipeline( DagsterEventType.ENGINE_EVENT, pipeline_context, @@ -851,7 +856,7 @@ event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_def.name, - step_key=hook_context.step.key, + step_key=str(hook_context.step.key), solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, @@ -874,7 +879,7 @@ event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_def.name, - step_key=hook_context.step.key, + step_key=str(hook_context.step.key), solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, @@ -900,7 +905,7 @@ event = DagsterEvent( event_type_value=event_type.value, pipeline_name=hook_context.pipeline_def.name, - step_key=hook_context.step.key, + step_key=str(hook_context.step.key), solid_handle=hook_context.step.solid_handle, step_kind_value=hook_context.step.kind.value, logging_tags=hook_context.logging_tags, diff --git a/python_modules/dagster/dagster/core/execution/api.py b/python_modules/dagster/dagster/core/execution/api.py --- a/python_modules/dagster/dagster/core/execution/api.py +++ b/python_modules/dagster/dagster/core/execution/api.py @@ -9,6 +9,7 @@ from dagster.core.events import DagsterEvent from dagster.core.execution.context.system import SystemPipelineExecutionContext from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.plan.plan import ExecutionPlan from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan from dagster.core.execution.retries import Retries @@ -445,9 +446,11 @@ parent_execution_plan_snapshot = instance.get_execution_plan_snapshot( parent_pipeline_run.execution_plan_snapshot_id ) - step_keys_to_execute = parse_step_selection( + step_set_to_execute = parse_step_selection( parent_execution_plan_snapshot.step_deps, step_selection ) + # convert to frozenset https://github.com/dagster-io/dagster/issues/2914 + step_keys_to_execute = list(step_set_to_execute) else: step_keys_to_execute = None @@ -458,8 +461,7 @@ tags=tags, solid_selection=parent_pipeline_run.solid_selection, solids_to_execute=parent_pipeline_run.solids_to_execute, - # convert to frozenset https://github.com/dagster-io/dagster/issues/2914 - step_keys_to_execute=list(step_keys_to_execute) if step_keys_to_execute else None, + step_keys_to_execute=step_keys_to_execute, root_run_id=parent_pipeline_run.root_run_id or parent_pipeline_run.run_id, parent_run_id=parent_pipeline_run.run_id, ) @@ -631,7 +633,7 @@ run_config = check.opt_dict_param(run_config, "run_config", key_type=str) mode = check.opt_str_param(mode, "mode", default=pipeline_def.get_default_mode_name()) - check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) + check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=(str, StepKey)) environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode) diff --git a/python_modules/dagster/dagster/core/execution/memoization.py b/python_modules/dagster/dagster/core/execution/memoization.py --- a/python_modules/dagster/dagster/core/execution/memoization.py +++ b/python_modules/dagster/dagster/core/execution/memoization.py @@ -6,6 +6,7 @@ from dagster.core.events import DagsterEvent, DagsterEventType from dagster.core.events.log import EventRecord from dagster.core.execution.context.system import SystemExecutionContext +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.plan.objects import StepOutputHandle from dagster.core.execution.plan.plan import ExecutionPlan from dagster.core.storage.asset_store import mem_asset_store @@ -93,7 +94,7 @@ ) output_handles_to_copy_by_step = defaultdict(list) for handle in output_handles_to_copy: - output_handles_to_copy_by_step[handle.step_key].append(handle) + output_handles_to_copy_by_step[StepKey.from_string(handle.step_key)].append(handle) intermediate_storage = pipeline_context.intermediate_storage for step in execution_plan.topological_steps(): @@ -159,6 +160,9 @@ for step_output_handle in step_input.source.step_output_handle_dependencies: # Only include handles that won't be satisfied by steps included in this # execution. - if step_output_handle.step_key not in execution_plan.step_keys_to_execute: + if ( + StepKey.from_string(step_output_handle.step_key) + not in execution_plan.step_keys_to_execute + ): output_handles_for_current_run.add(step_output_handle) return output_handles_for_current_run diff --git a/python_modules/dagster/dagster/core/execution/plan/active.py b/python_modules/dagster/dagster/core/execution/plan/active.py --- a/python_modules/dagster/dagster/core/execution/plan/active.py +++ b/python_modules/dagster/dagster/core/execution/plan/active.py @@ -7,6 +7,7 @@ from dagster.core.execution.retries import Retries from dagster.utils import pop_delayed_interrupts +from .key import StepKey from .plan import ExecutionPlan @@ -280,22 +281,27 @@ steps_to_abandon = self.get_steps_to_abandon() def mark_failed(self, step_key): + check.inst_param(step_key, "step_key", StepKey) self._failed.add(step_key) self._mark_complete(step_key) def mark_success(self, step_key): + check.inst_param(step_key, "step_key", StepKey) self._success.add(step_key) self._mark_complete(step_key) def mark_skipped(self, step_key): + check.inst_param(step_key, "step_key", StepKey) self._skipped.add(step_key) self._mark_complete(step_key) def mark_abandoned(self, step_key): + check.inst_param(step_key, "step_key", StepKey) self._abandoned.add(step_key) self._mark_complete(step_key) def mark_interrupted(self, step_key): + check.inst_param(step_key, "step_key", StepKey) self._interrupted.add(step_key) def check_for_interrupts(self): @@ -306,6 +312,7 @@ not self._retries.disabled, "Attempted to mark {} as up for retry but retries are disabled".format(step_key), ) + check.inst_param(step_key, "step_key", StepKey) check.opt_float_param(at_time, "at_time") # if retries are enabled - queue this back up @@ -319,7 +326,7 @@ # do not attempt to execute again self._abandoned.add(step_key) - self._retries.mark_attempt(step_key) + self._retries.mark_attempt(str(step_key)) self._mark_complete(step_key) @@ -334,16 +341,16 @@ def handle_event(self, dagster_event): check.inst_param(dagster_event, "dagster_event", DagsterEvent) - + step_key = StepKey.from_string(dagster_event.step_key) if dagster_event.step_key else None if dagster_event.is_step_failure: - self.mark_failed(dagster_event.step_key) + self.mark_failed(step_key) elif dagster_event.is_step_success: - self.mark_success(dagster_event.step_key) + self.mark_success(step_key) elif dagster_event.is_step_skipped: - self.mark_skipped(dagster_event.step_key) + self.mark_skipped(step_key) elif dagster_event.is_step_up_for_retry: self.mark_up_for_retry( - dagster_event.step_key, + step_key, time.time() + dagster_event.step_retry_data.seconds_to_wait if dagster_event.step_retry_data.seconds_to_wait else None, diff --git a/python_modules/dagster/dagster/core/execution/plan/compute.py b/python_modules/dagster/dagster/core/execution/plan/compute.py --- a/python_modules/dagster/dagster/core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/core/execution/plan/compute.py @@ -34,7 +34,6 @@ return ExecutionStep( pipeline_name=pipeline_name, - key_suffix="compute", step_inputs=step_inputs, step_outputs=[ StepOutput( diff --git a/python_modules/dagster/dagster/core/execution/plan/execute_plan.py b/python_modules/dagster/dagster/core/execution/plan/execute_plan.py --- a/python_modules/dagster/dagster/core/execution/plan/execute_plan.py +++ b/python_modules/dagster/dagster/core/execution/plan/execute_plan.py @@ -59,7 +59,7 @@ # capture all of the logs for this step with pipeline_context.instance.compute_log_manager.watch( - step_context.pipeline_run, step_context.step.key + step_context.pipeline_run, str(step_context.step.key) ): missing_input_sources = pipeline_context.intermediate_storage.get_missing_input_sources( step_context, step @@ -216,7 +216,7 @@ check.inst_param(retries, "retries", Retries) try: - prior_attempt_count = retries.get_attempt_count(step_context.step.key) + prior_attempt_count = retries.get_attempt_count(str(step_context.step.key)) if step_context.step_launcher: step_events = step_context.step_launcher.launch_step(step_context, prior_attempt_count) else: @@ -241,7 +241,7 @@ step_failure_data=StepFailureData(error=fail_err, user_failure_data=None), ) else: # retries.enabled or retries.deferred - prev_attempts = retries.get_attempt_count(step_context.step.key) + prev_attempts = retries.get_attempt_count(str(step_context.step.key)) if prev_attempts >= retry_request.max_retries: fail_err = SerializableErrorInfo( message="Exceeded max_retries of {}".format(retry_request.max_retries), diff --git a/python_modules/dagster/dagster/core/execution/plan/execute_step.py b/python_modules/dagster/dagster/core/execution/plan/execute_step.py --- a/python_modules/dagster/dagster/core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/core/execution/plan/execute_step.py @@ -94,7 +94,7 @@ 'for non-optional output "{step_output_def.name}"'.format( handle=str(step.solid_handle), step_output_def=step_output_def ), - step_key=step.key, + step_key=str(step.key), output_name=step_output_def.name, ) @@ -313,7 +313,7 @@ step_output = step.step_output_named(output.output_name) version = step_context.execution_plan.resolve_step_output_versions()[ - StepOutputHandle(step_context.step.key, output.output_name) + StepOutputHandle(str(step_context.step.key), output.output_name) ] for output_event in _type_checked_step_output_event_sequence(step_context, output, version): @@ -450,7 +450,7 @@ solid_def=step_context.solid_def.name, solid=step_context.solid.name, ), - step_key=step_context.step.key, + step_key=str(step_context.step.key), solid_def_name=step_context.solid_def.name, solid_name=step_context.solid.name, ): diff --git a/python_modules/dagster/dagster/core/execution/plan/external_step.py b/python_modules/dagster/dagster/core/execution/plan/external_step.py --- a/python_modules/dagster/dagster/core/execution/plan/external_step.py +++ b/python_modules/dagster/dagster/core/execution/plan/external_step.py @@ -44,7 +44,7 @@ step_run_ref = step_context_to_step_run_ref(step_context, prior_attempts_count) run_id = step_context.pipeline_run.run_id - step_run_dir = os.path.join(self.scratch_dir, run_id, step_run_ref.step_key) + step_run_dir = os.path.join(self.scratch_dir, run_id, str(step_run_ref.step_key)) os.makedirs(step_run_dir) step_run_ref_file_path = os.path.join(step_run_dir, PICKLED_STEP_RUN_REF_FILE_NAME) diff --git a/python_modules/dagster/dagster/core/execution/plan/inputs.py b/python_modules/dagster/dagster/core/execution/plan/inputs.py --- a/python_modules/dagster/dagster/core/execution/plan/inputs.py +++ b/python_modules/dagster/dagster/core/execution/plan/inputs.py @@ -10,6 +10,7 @@ ) from dagster.core.definitions.input import InputDefinition from dagster.core.errors import DagsterTypeLoadingError, user_code_error_boundary +from dagster.core.execution.plan.key import StepKey def join_and_hash(*args): @@ -95,7 +96,7 @@ @property def step_key_dependencies(self): - return {self.step_output_handle.step_key} + return {self.get_output_step_key()} @property def step_output_handle_dependencies(self): @@ -139,18 +140,21 @@ def compute_version(self, step_versions): if ( - self.step_output_handle.step_key not in step_versions - or not step_versions[self.step_output_handle.step_key] + self.get_output_step_key() not in step_versions + or not step_versions[self.get_output_step_key()] ): return None else: return join_and_hash( - step_versions[self.step_output_handle.step_key], self.step_output_handle.output_name + step_versions[self.get_output_step_key()], self.step_output_handle.output_name ) def required_resource_keys(self): return {self.input_def.manager_key} if self.input_def.manager_key else set() + def get_output_step_key(self): + return StepKey.from_string(self.step_output_handle.step_key) + def _generate_error_boundary_msg_for_step_input(context, input_name): return lambda: """Error occurred during input loading: diff --git a/python_modules/dagster/dagster/core/execution/plan/key.py b/python_modules/dagster/dagster/core/execution/plan/key.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/execution/plan/key.py @@ -0,0 +1,36 @@ +from collections import namedtuple + +from dagster import check + + +class StepKey(namedtuple("_StepKey", "key")): + def __new__(cls, key): + return super(StepKey, cls).__new__(cls, key=check.str_param(key, "key"),) + + def __str__(self): + return self.key + + @classmethod + def from_string(cls, string): + return cls(string) + + +def check_step_key_param(step_key): + if isinstance(step_key, str): + step_key = StepKey.from_string(step_key) + + return check.inst_param(step_key, "step_key", StepKey) + + +def check_opt_step_key_param(step_key): + if isinstance(step_key, str): + step_key = StepKey.from_string(step_key) + + return check.opt_inst_param(step_key, "step_key", StepKey) + + +def check_opt_step_key_str_param(step_key): + if isinstance(step_key, StepKey): + return str(step_key) + + return check.opt_str_param(step_key, "step_key") diff --git a/python_modules/dagster/dagster/core/execution/plan/objects.py b/python_modules/dagster/dagster/core/execution/plan/objects.py --- a/python_modules/dagster/dagster/core/execution/plan/objects.py +++ b/python_modules/dagster/dagster/core/execution/plan/objects.py @@ -6,9 +6,10 @@ from dagster.core.definitions.events import EventMetadataEntry from dagster.core.types.dagster_type import DagsterType from dagster.serdes import whitelist_for_serdes -from dagster.utils import merge_dicts from dagster.utils.error import SerializableErrorInfo +from .key import StepKey + @whitelist_for_serdes class StepOutputHandle(namedtuple("_StepOutputHandle", "step_key output_name")): @@ -16,7 +17,7 @@ def from_step(step, output_name="result"): check.inst_param(step, "step", ExecutionStep) - return StepOutputHandle(step.key, output_name) + return StepOutputHandle(str(step.key), output_name) def __new__(cls, step_key, output_name="result"): return super(StepOutputHandle, cls).__new__( @@ -25,6 +26,20 @@ output_name=check.str_param(output_name, "output_name"), ) + def to_storage_value(self): + return { + "__class__": self.__class__.__name__, + "step_key": str(self.step_key), + "output_name": self.output_name, + } + + @classmethod + def from_storage_dict(cls, storage_dict): + return cls( + step_key=StepKey.from_string(storage_dict["step_key"]), + output_name=storage_dict["output_name"], + ) + @whitelist_for_serdes class StepInputData(namedtuple("_StepInputData", "input_name type_check_data")): @@ -160,29 +175,19 @@ namedtuple( "_ExecutionStep", ( - "pipeline_name key_suffix step_inputs step_input_dict step_outputs step_output_dict " - "compute_fn kind solid_handle solid logging_tags tags hook_defs" + "pipeline_name step_inputs step_input_dict step_outputs step_output_dict " + "compute_fn kind solid_handle solid tags hook_defs" ), ) ): def __new__( - cls, - pipeline_name, - key_suffix, - step_inputs, - step_outputs, - compute_fn, - kind, - solid_handle, - solid, - logging_tags=None, + cls, pipeline_name, step_inputs, step_outputs, compute_fn, kind, solid_handle, solid, ): from .inputs import StepInput return super(ExecutionStep, cls).__new__( cls, pipeline_name=check.str_param(pipeline_name, "pipeline_name"), - key_suffix=check.str_param(key_suffix, "key_suffix"), step_inputs=check.list_param(step_inputs, "step_inputs", of_type=StepInput), step_input_dict={si.name: si for si in step_inputs}, step_outputs=check.list_param(step_outputs, "step_outputs", of_type=StepOutput), @@ -194,22 +199,14 @@ kind=check.inst_param(kind, "kind", StepKind), solid_handle=check.inst_param(solid_handle, "solid_handle", SolidHandle), solid=check.inst_param(solid, "solid", Solid), - logging_tags=merge_dicts( - { - "step_key": str(solid_handle) + "." + key_suffix, - "pipeline": pipeline_name, - "solid": solid_handle.name, - "solid_definition": solid.definition.name, - }, - check.opt_dict_param(logging_tags, "logging_tags"), - ), tags=solid.tags, hook_defs=solid.hook_defs, ) @property def key(self): - return str(self.solid_handle) + "." + self.key_suffix + # will remove .compute in follow up diff + return StepKey.from_string(f"{self.solid_handle}.compute") @property def solid_name(self): @@ -230,3 +227,12 @@ def step_input_named(self, name): check.str_param(name, "name") return self.step_input_dict[name] + + @property + def logging_tags(self): + return { + "step_key": str(self.key), + "pipeline": self.pipeline_name, + "solid": self.solid_handle.name, + "solid_definition": self.solid.definition.name, + } diff --git a/python_modules/dagster/dagster/core/execution/plan/plan.py b/python_modules/dagster/dagster/core/execution/plan/plan.py --- a/python_modules/dagster/dagster/core/execution/plan/plan.py +++ b/python_modules/dagster/dagster/core/execution/plan/plan.py @@ -30,6 +30,7 @@ FromStepOutput, StepInput, ) +from .key import StepKey, check_step_key_param from .objects import ExecutionStep, StepOutputHandle @@ -50,7 +51,8 @@ environment_config, "environment_config", EnvironmentConfig ) check.opt_str_param(mode, "mode") - check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) + check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=StepKey) + self.step_keys_to_execute = step_keys_to_execute self.mode_definition = ( pipeline.get_definition().get_mode_definition(mode) @@ -306,7 +308,7 @@ class ExecutionPlan( namedtuple( "_ExecutionPlan", - "pipeline step_dict deps steps artifacts_persisted step_keys_to_execute environment_config", + "pipeline step_dict deps artifacts_persisted step_keys_to_execute environment_config", ) ): def __new__( @@ -318,7 +320,14 @@ step_keys_to_execute, environment_config, ): - missing_steps = [step_key for step_key in step_keys_to_execute if step_key not in step_dict] + check.list_param(step_keys_to_execute, "step_keys_to_execute", of_type=StepKey) + check.dict_param(deps, "deps", key_type=StepKey, value_type=set) + for key, dep_key_set in deps.items(): + check.set_param(dep_key_set, f"deps[{key}]", of_type=StepKey) + + missing_steps = [ + str(step_key) for step_key in step_keys_to_execute if step_key not in step_dict + ] if missing_steps: raise DagsterExecutionStepNotFoundError( "Execution plan does not contain step{plural}: {steps}".format( @@ -326,18 +335,16 @@ ), step_keys=missing_steps, ) + return super(ExecutionPlan, cls).__new__( cls, pipeline=check.inst_param(pipeline, "pipeline", IPipeline), step_dict=check.dict_param( - step_dict, "step_dict", key_type=str, value_type=ExecutionStep + step_dict, "step_dict", key_type=StepKey, value_type=ExecutionStep ), - deps=check.dict_param(deps, "deps", key_type=str, value_type=set), - steps=list(step_dict.values()), + deps=deps, artifacts_persisted=check.bool_param(artifacts_persisted, "artifacts_persisted"), - step_keys_to_execute=check.list_param( - step_keys_to_execute, "step_keys_to_execute", of_type=str - ), + step_keys_to_execute=step_keys_to_execute, environment_config=check.inst_param( environment_config, "environment_config", EnvironmentConfig ), @@ -371,12 +378,16 @@ return handle.asset_store_key if handle else None def has_step(self, key): - check.str_param(key, "key") + check.inst_param(key, "key", StepKey) return key in self.step_dict - def get_step_by_key(self, key): - check.str_param(key, "key") - return self.step_dict[key] + def get_step_by_key(self, step_key): + step_key = check_step_key_param(step_key) + return self.step_dict[step_key] + + @property + def steps(self): + return list(self.step_dict.values()) def topological_steps(self): return [step for step_level in self.topological_step_levels() for step in step_level] @@ -416,7 +427,8 @@ return deps def build_subset_plan(self, step_keys_to_execute): - check.list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) + check.list_param(step_keys_to_execute, "step_keys_to_execute") + step_keys_to_execute = [check_step_key_param(key) for key in step_keys_to_execute] return ExecutionPlan( self.pipeline, self.step_dict, @@ -484,7 +496,9 @@ check.inst_param(pipeline, "pipeline", IPipeline) check.inst_param(environment_config, "environment_config", EnvironmentConfig) check.opt_str_param(mode, "mode") - check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) + check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=(str, StepKey)) + if step_keys_to_execute: + step_keys_to_execute = [check_step_key_param(key) for key in step_keys_to_execute] plan_builder = _PlanBuilder( pipeline, environment_config, mode=mode, step_keys_to_execute=step_keys_to_execute, diff --git a/python_modules/dagster/dagster/core/execution/resolve_versions.py b/python_modules/dagster/dagster/core/execution/resolve_versions.py --- a/python_modules/dagster/dagster/core/execution/resolve_versions.py +++ b/python_modules/dagster/dagster/core/execution/resolve_versions.py @@ -1,7 +1,6 @@ from dagster import check from dagster.core.errors import DagsterInvariantViolationError from dagster.core.execution.context.init import InitResourceContext -from dagster.core.execution.plan.objects import StepOutputHandle from dagster.utils.backcompat import experimental from .plan.inputs import join_and_hash @@ -124,10 +123,13 @@ def resolve_step_output_versions_helper(execution_plan): + from dagster.core.execution.plan.objects import StepOutputHandle step_versions = execution_plan.resolve_step_versions() return { - StepOutputHandle(step.key, output_name): join_and_hash(output_name, step_versions[step.key]) + StepOutputHandle(str(step.key), output_name): join_and_hash( + output_name, step_versions[step.key] + ) for step in execution_plan.steps for output_name in step.step_output_dict.keys() } diff --git a/python_modules/dagster/dagster/core/execution/retries.py b/python_modules/dagster/dagster/core/execution/retries.py --- a/python_modules/dagster/dagster/core/execution/retries.py +++ b/python_modules/dagster/dagster/core/execution/retries.py @@ -40,9 +40,11 @@ return self._mode == RetryMode.DEFERRED def get_attempt_count(self, key): + check.str_param(key, "key") return self._attempts[key] def mark_attempt(self, key): + check.str_param(key, "key") self._attempts[key] += 1 def for_inner_plan(self): diff --git a/python_modules/dagster/dagster/core/executor/multiprocess.py b/python_modules/dagster/dagster/core/executor/multiprocess.py --- a/python_modules/dagster/dagster/core/executor/multiprocess.py +++ b/python_modules/dagster/dagster/core/executor/multiprocess.py @@ -58,7 +58,7 @@ EngineEventData( [ EventMetadataEntry.text(str(os.getpid()), "pid"), - EventMetadataEntry.text(self.step_key, "step_key"), + EventMetadataEntry.text(str(self.step_key), "step_key"), ], marker_end=DELEGATE_MARKER, ), @@ -119,7 +119,7 @@ pipeline_context, "Multiprocess executor: received termination signal - " "forwarding to active child processes", - EngineEventData.interrupted(list(term_events.keys())), + EngineEventData.interrupted([str(key) for key in term_events.keys()]), ) stopping = True for key, event in term_events.items(): @@ -164,7 +164,7 @@ "unexpectedly exited with code {exit_code}" ).format(step_key=key, exit_code=crash.exit_code), EngineEventData.engine_error(serializable_error), - step_key=key, + step_key=str(key), ) step_failure_event = DagsterEvent.step_failure_event( step_context=pipeline_context.for_step( @@ -226,7 +226,7 @@ step_context, "Launching subprocess for {}".format(step.key), EngineEventData(marker_start=DELEGATE_MARKER), - step_key=step.key, + step_key=str(step.key), ) for ret in execute_child_process_command(command): diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -15,6 +15,7 @@ DagsterRunAlreadyExists, DagsterRunConflict, ) +from dagster.core.execution.plan.key import check_opt_step_key_str_param from dagster.core.storage.migration.utils import upgrading_instance from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus from dagster.core.storage.tags import MEMOIZED_RUN_TAG @@ -559,6 +560,8 @@ check.opt_set_param(solids_to_execute, "solids_to_execute", of_type=str) check.opt_list_param(solid_selection, "solid_selection", of_type=str) + check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) + if solids_to_execute: if isinstance(pipeline_def, PipelineSubsetDefinition): # for the case when pipeline_def is created by IPipeline or ExternalPipeline @@ -595,7 +598,9 @@ subsetted_execution_plan = resolve_memoized_execution_plan( full_execution_plan ) # TODO: tighter integration with existing step_keys_to_execute functionality - step_keys_to_execute = subsetted_execution_plan.step_keys_to_execute + step_keys_to_execute = [ + str(key) for key in subsetted_execution_plan.step_keys_to_execute + ] else: subsetted_execution_plan = ( full_execution_plan.build_subset_plan(step_keys_to_execute) @@ -981,6 +986,8 @@ check.class_param(cls, "cls") check.str_param(message, "message") check.inst_param(pipeline_run, "pipeline_run", PipelineRun) + step_key = check_opt_step_key_str_param(step_key) + engine_event_data = check.opt_inst_param( engine_event_data, "engine_event_data", EngineEventData, EngineEventData([]), ) diff --git a/python_modules/dagster/dagster/core/selector/subset_selector.py b/python_modules/dagster/dagster/core/selector/subset_selector.py --- a/python_modules/dagster/dagster/core/selector/subset_selector.py +++ b/python_modules/dagster/dagster/core/selector/subset_selector.py @@ -4,6 +4,7 @@ from dagster.core.definitions.dependency import DependencyStructure from dagster.core.errors import DagsterInvalidSubsetError +from dagster.core.execution.plan.key import StepKey from dagster.utils import check MAX_NUM = sys.maxsize @@ -200,7 +201,7 @@ It currently only supports top-level solids. Args: - step_deps (Dict[str, Set[str]]): a dictionary of execution step dependency where the key is + step_deps (Dict[StepKey, Set[StepKey]]): a dictionary of execution step dependency where the key is a step key and the value is a set of direct upstream dependency of the step. step_selection (List[str]): a list of the step key selection queries (including single step key) to execute. @@ -210,16 +211,19 @@ subset selected. """ check.list_param(step_selection, "step_selection", of_type=str) + check.dict_param(step_deps, "step_deps", key_type=StepKey, value_type=set) + + str_deps = {str(key): set([str(x) for x in value]) for key, value in step_deps.items()} # reverse step_deps to get the downstream_deps # make sure we have all items as keys, including the ones without downstream dependencies - downstream_deps = defaultdict(set, {k: set() for k in step_deps.keys()}) - for downstream_key, upstream_keys in step_deps.items(): + downstream_deps = defaultdict(set, {k: set() for k in str_deps.keys()}) + for downstream_key, upstream_keys in str_deps.items(): for step_key in upstream_keys: downstream_deps[step_key].add(downstream_key) # generate dep graph - graph = {"upstream": step_deps, "downstream": downstream_deps} + graph = {"upstream": str_deps, "downstream": downstream_deps} traverser = Traverser(graph=graph) steps_set = set() diff --git a/python_modules/dagster/dagster/core/snap/execution_plan_snapshot.py b/python_modules/dagster/dagster/core/snap/execution_plan_snapshot.py --- a/python_modules/dagster/dagster/core/snap/execution_plan_snapshot.py +++ b/python_modules/dagster/dagster/core/snap/execution_plan_snapshot.py @@ -2,6 +2,7 @@ from dagster import check from dagster.core.execution.plan.inputs import StepInput +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.plan.objects import StepKind, StepOutput, StepOutputHandle from dagster.core.execution.plan.plan import ExecutionPlan, ExecutionStep from dagster.serdes import create_snapshot_id, whitelist_for_serdes @@ -36,11 +37,11 @@ @property def step_deps(self): # Construct dependency dictionary (downstream to upstreams) - deps = {step.key: set() for step in self.steps} + deps = {StepKey.from_string(step.key): set() for step in self.steps} for step in self.steps: for step_input in step.inputs: - deps[step.key].update( + deps[StepKey.from_string(step.key)].update( [output_handle.step_key for output_handle in step_input.upstream_output_handles] ) return deps @@ -128,7 +129,7 @@ def _snapshot_from_execution_step(execution_step): check.inst_param(execution_step, "execution_step", ExecutionStep) return ExecutionStepSnap( - key=execution_step.key, + key=str(execution_step.key), inputs=sorted( list(map(_snapshot_from_step_input, execution_step.step_inputs)), key=lambda si: si.name ), @@ -160,5 +161,5 @@ ), artifacts_persisted=execution_plan.artifacts_persisted, pipeline_snapshot_id=pipeline_snapshot_id, - step_keys_to_execute=execution_plan.step_keys_to_execute, + step_keys_to_execute=[str(key) for key in execution_plan.step_keys_to_execute], ) diff --git a/python_modules/dagster/dagster/serdes/__init__.py b/python_modules/dagster/dagster/serdes/__init__.py --- a/python_modules/dagster/dagster/serdes/__init__.py +++ b/python_modules/dagster/dagster/serdes/__init__.py @@ -185,7 +185,7 @@ klass_name = val.__class__.__name__ check.invariant( klass_name in whitelist_map["types"]["tuple"], - "Can only serialize whitelisted namedtuples, received tuple {}".format(val), + "Can only serialize whitelisted namedtuples, received tuple {}".format(repr(val)), ) if klass_name in whitelist_map["persistence"]: return val.to_storage_value() diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan.py b/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan.py --- a/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan.py @@ -18,9 +18,9 @@ assert len(levels) == 3 - assert [step.key for step in levels[0]] == ["return_two.compute"] - assert [step.key for step in levels[1]] == ["add_three.compute", "mult_three.compute"] - assert [step.key for step in levels[2]] == ["adder.compute"] + assert [str(step.key) for step in levels[0]] == ["return_two.compute"] + assert [str(step.key) for step in levels[1]] == ["add_three.compute", "mult_three.compute"] + assert [str(step.key) for step in levels[2]] == ["adder.compute"] def test_create_execution_plan_with_bad_inputs(): @@ -38,7 +38,7 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 step_1 = steps[0] - assert step_1.key == "return_two.compute" + assert str(step_1.key) == "return_two.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -49,8 +49,8 @@ assert len(steps) == 2 step_2 = steps[0] step_3 = steps[1] - assert step_2.key == "add_three.compute" - assert step_3.key == "mult_three.compute" + assert str(step_2.key) == "add_three.compute" + assert str(step_3.key) == "mult_three.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -66,7 +66,7 @@ assert len(steps) == 1 step_4 = steps[0] - assert step_4.key == "adder.compute" + assert str(step_4.key) == "adder.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -87,7 +87,7 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 step_1 = steps[0] - assert step_1.key == "return_two.compute" + assert str(step_1.key) == "return_two.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -98,8 +98,8 @@ assert len(steps) == 2 step_2 = steps[0] step_3 = steps[1] - assert step_2.key == "add_three.compute" - assert step_3.key == "mult_three.compute" + assert str(step_2.key) == "add_three.compute" + assert str(step_3.key) == "mult_three.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -122,7 +122,7 @@ assert len(steps) == 1 step_4 = steps[0] - assert step_4.key == "adder.compute" + assert str(step_4.key) == "adder.compute" active_execution.mark_abandoned(step_4.key) assert active_execution.is_complete @@ -137,7 +137,7 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 step_1 = steps[0] - assert step_1.key == "return_two.compute" + assert str(step_1.key) == "return_two.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -146,13 +146,13 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 - assert steps[0].key == "return_two.compute" + assert str(steps[0].key) == "return_two.compute" active_execution.mark_up_for_retry(step_1.key) steps = active_execution.get_steps_to_execute() assert len(steps) == 1 - assert steps[0].key == "return_two.compute" + assert str(steps[0].key) == "return_two.compute" active_execution.mark_success(step_1.key) @@ -160,8 +160,8 @@ assert len(steps) == 2 step_2 = steps[0] step_3 = steps[1] - assert step_2.key == "add_three.compute" - assert step_3.key == "mult_three.compute" + assert str(step_2.key) == "add_three.compute" + assert str(step_3.key) == "mult_three.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -184,7 +184,7 @@ assert len(steps) == 1 step_4 = steps[0] - assert step_4.key == "adder.compute" + assert str(step_4.key) == "adder.compute" active_execution.mark_abandoned(step_4.key) assert active_execution.is_complete @@ -200,7 +200,7 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 step_1 = steps[0] - assert step_1.key == "return_two.compute" + assert str(step_1.key) == "return_two.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -218,7 +218,7 @@ steps = active_execution.get_steps_to_execute() assert len(steps) == 1 step_1 = steps[0] - assert step_1.key == "return_two.compute" + assert str(step_1.key) == "return_two.compute" steps = active_execution.get_steps_to_execute() assert len(steps) == 0 # cant progress @@ -284,12 +284,12 @@ plan = create_execution_plan(priorities) with plan.start(Retries(RetryMode.DISABLED), sort_key_fn) as active_execution: steps = active_execution.get_steps_to_execute() - assert steps[0].key == "pri_5.compute" - assert steps[1].key == "pri_4.compute" - assert steps[2].key == "pri_3.compute" - assert steps[3].key == "pri_2.compute" - assert steps[4].key == "pri_none.compute" - assert steps[5].key == "pri_neg_1.compute" + assert str(steps[0].key) == "pri_5.compute" + assert str(steps[1].key) == "pri_4.compute" + assert str(steps[2].key) == "pri_3.compute" + assert str(steps[3].key) == "pri_2.compute" + assert str(steps[4].key) == "pri_none.compute" + assert str(steps[5].key) == "pri_neg_1.compute" _ = [active_execution.mark_skipped(step.key) for step in steps] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_external_step.py b/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_external_step.py --- a/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_external_step.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_external_step.py @@ -170,7 +170,6 @@ assert rehydrated_step_context.required_resource_keys == step_context.required_resource_keys rehydrated_step = rehydrated_step_context.step assert rehydrated_step.pipeline_name == step.pipeline_name - assert rehydrated_step.key_suffix == step.key_suffix assert rehydrated_step.step_inputs == step.step_inputs assert rehydrated_step.step_outputs == step.step_outputs assert rehydrated_step.kind == step.kind diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py @@ -1,5 +1,6 @@ from dagster import execute_pipeline, seven from dagster.core.execution.api import create_execution_plan +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan from dagster.core.instance import DagsterInstance, InstanceType from dagster.core.launcher import DefaultRunLauncher @@ -55,7 +56,7 @@ run_config["solids"]["take_string_1_asset"]["config"]["input_str"] = "banana" assert get_step_keys_to_execute(asset_pipeline, run_config, "only_mode") == [ - "take_string_1_asset.compute" + StepKey.from_string("take_string_1_asset.compute") ] result = execute_pipeline( asset_pipeline, diff --git a/python_modules/dagster/dagster_tests/core_tests/hook_tests/test_hook_run.py b/python_modules/dagster/dagster_tests/core_tests/hook_tests/test_hook_run.py --- a/python_modules/dagster/dagster_tests/core_tests/hook_tests/test_hook_run.py +++ b/python_modules/dagster/dagster_tests/core_tests/hook_tests/test_hook_run.py @@ -56,17 +56,17 @@ @event_list_hook def pipeline_hook(context, _): - called_hook_to_step_keys[context.hook_def.name].add(context.step.key) + called_hook_to_step_keys[context.hook_def.name].add(str(context.step.key)) return HookExecutionResult("pipeline_hook") @event_list_hook def solid_1_hook(context, _): - called_hook_to_step_keys[context.hook_def.name].add(context.step.key) + called_hook_to_step_keys[context.hook_def.name].add(str(context.step.key)) return HookExecutionResult("solid_1_hook") @event_list_hook def composite_1_hook(context, _): - called_hook_to_step_keys[context.hook_def.name].add(context.step.key) + called_hook_to_step_keys[context.hook_def.name].add(str(context.step.key)) return HookExecutionResult("composite_1_hook") @solid @@ -119,7 +119,7 @@ @event_list_hook def hook_a_generic(context, _): - called_hook_to_step_keys[context.hook_def.name].add(context.step.key) + called_hook_to_step_keys[context.hook_def.name].add(str(context.step.key)) return HookExecutionResult("hook_a_generic") @solid @@ -254,7 +254,7 @@ @event_list_hook def hook_a_generic(context, _): - called_hook_to_step_keys[context.hook_def.name].add(context.step.key) + called_hook_to_step_keys[context.hook_def.name].add(str(context.step.key)) return HookExecutionResult("hook_a_generic") @solid diff --git a/python_modules/dagster/dagster_tests/core_tests/selector_tests/test_subset_selector.py b/python_modules/dagster/dagster_tests/core_tests/selector_tests/test_subset_selector.py --- a/python_modules/dagster/dagster_tests/core_tests/selector_tests/test_subset_selector.py +++ b/python_modules/dagster/dagster_tests/core_tests/selector_tests/test_subset_selector.py @@ -1,6 +1,7 @@ import pytest from dagster import InputDefinition, lambda_solid, pipeline from dagster.core.errors import DagsterInvalidSubsetError +from dagster.core.execution.plan.key import StepKey from dagster.core.selector.subset_selector import ( MAX_NUM, Traverser, @@ -141,11 +142,14 @@ step_deps = { - "return_one.compute": set(), - "return_two.compute": set(), - "add_nums.compute": {"return_one.compute", "return_two.compute"}, - "multiply_two.compute": {"add_nums.compute"}, - "add_one.compute": {"multiply_two.compute"}, + StepKey.from_string("return_one.compute"): set(), + StepKey.from_string("return_two.compute"): set(), + StepKey.from_string("add_nums.compute"): { + StepKey.from_string("return_one.compute"), + StepKey.from_string("return_two.compute"), + }, + StepKey.from_string("multiply_two.compute"): {StepKey.from_string("add_nums.compute")}, + StepKey.from_string("add_one.compute"): {StepKey.from_string("multiply_two.compute")}, } diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py --- a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py @@ -85,7 +85,9 @@ asset_store_operation_events[1].event_specific_data.op == AssetStoreOperationType.GET_ASSET ) - assert "solid_a.compute" == asset_store_operation_events[1].event_specific_data.step_key + assert "solid_a.compute" == str( + asset_store_operation_events[1].event_specific_data.step_key + ) # SET ASSET for step "solid_b.compute" output "result" assert ( @@ -121,7 +123,7 @@ ) ) assert len(get_asset_events) == 1 - assert get_asset_events[0].event_specific_data.step_key == "solid_a.compute" + assert str(get_asset_events[0].event_specific_data.step_key) == "solid_a.compute" def execute_pipeline_with_steps(pipeline_def, step_keys_to_execute=None): diff --git a/python_modules/dagster/dagster_tests/core_tests/test_nothing_dependencies.py b/python_modules/dagster/dagster_tests/core_tests/test_nothing_dependencies.py --- a/python_modules/dagster/dagster_tests/core_tests/test_nothing_dependencies.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_nothing_dependencies.py @@ -306,7 +306,7 @@ levels = plan.topological_step_levels() - assert "emit_nothing" in levels[0][0].key - assert "consume_nothing" in levels[1][0].key + assert "emit_nothing" in str(levels[0][0].key) + assert "consume_nothing" in str(levels[1][0].key) assert execute_pipeline(pipe).success diff --git a/python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py b/python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py --- a/python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py @@ -20,6 +20,7 @@ from dagster.core.definitions import InputDefinition from dagster.core.errors import DagsterInvariantViolationError from dagster.core.execution.api import create_execution_plan +from dagster.core.execution.plan.key import StepKey from dagster.core.execution.plan.objects import StepOutputHandle from dagster.core.execution.resolve_versions import ( join_and_hash, @@ -173,11 +174,12 @@ versions = speculative_execution_plan.resolve_step_versions() assert ( - versions["versioned_solid_no_input.compute"] == versioned_pipeline_expected_step1_version() + versions[StepKey.from_string("versioned_solid_no_input.compute")] + == versioned_pipeline_expected_step1_version() ) assert ( - versions["versioned_solid_takes_input.compute"] + versions[StepKey.from_string("versioned_solid_takes_input.compute")] == versioned_pipeline_expected_step2_version() ) @@ -234,8 +236,8 @@ memoized_execution_plan = resolve_memoized_execution_plan(speculative_execution_plan) assert set(memoized_execution_plan.step_keys_to_execute) == { - "versioned_solid_no_input.compute", - "versioned_solid_takes_input.compute", + StepKey.from_string("versioned_solid_no_input.compute"), + StepKey.from_string("versioned_solid_takes_input.compute"), } @@ -253,14 +255,18 @@ memoized_execution_plan = resolve_memoized_execution_plan(speculative_execution_plan) - assert memoized_execution_plan.step_keys_to_execute == ["versioned_solid_takes_input.compute"] + assert memoized_execution_plan.step_keys_to_execute == [ + StepKey.from_string("versioned_solid_takes_input.compute") + ] expected_handle = StepOutputHandle( step_key="versioned_solid_no_input.compute", output_name="result" ) assert ( - memoized_execution_plan.step_dict["versioned_solid_takes_input.compute"] + memoized_execution_plan.step_dict[ + StepKey.from_string("versioned_solid_takes_input.compute") + ] .step_input_dict["intput"] .source.step_output_handle == expected_handle @@ -282,7 +288,7 @@ ] = 4 assert resolve_memoized_execution_plan(speculative_execution_plan).step_keys_to_execute == [ - "solid_takes_input.compute" + StepKey.from_string("solid_takes_input.compute") ] @@ -342,7 +348,10 @@ ) step1_version = join_and_hash(input_version, solid1_version) - assert versions["versioned_solid_ext_input_builtin_type.compute"] == step1_version + assert ( + versions[StepKey.from_string("versioned_solid_ext_input_builtin_type.compute")] + == step1_version + ) output_version = join_and_hash(step1_version, "result") hashed_input2 = output_version @@ -355,7 +364,7 @@ ) step2_version = join_and_hash(hashed_input2, solid2_version) - assert versions["versioned_solid_takes_input.compute"] == step2_version + assert versions[StepKey.from_string("versioned_solid_takes_input.compute")] == step2_version @solid( @@ -382,7 +391,7 @@ solid_version = join_and_hash(solid_def_version, solid_config_version, solid_resources_version) step_version = join_and_hash(input_version, solid_version) - assert versions["versioned_solid_default_value.compute"] == step_version + assert versions[StepKey.from_string("versioned_solid_default_value.compute")] == step_version def test_step_keys_already_provided(): @@ -521,7 +530,7 @@ step_version = join_and_hash(solid_version) - assert versions["fake_solid_resources_versioned.compute"] == step_version + assert versions[StepKey.from_string("fake_solid_resources_versioned.compute")] == step_version def test_step_versions_composite_solid(): @@ -549,4 +558,4 @@ versions = speculative_execution_plan.resolve_step_versions() - assert versions["do_stuff.scalar_config_solid.compute"] == None + assert versions[StepKey.from_string("do_stuff.scalar_config_solid.compute")] == None diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py @@ -197,7 +197,7 @@ coalesced_plan = coalesce_execution_steps(execution_plan) for solid_handle, solid_steps in coalesced_plan.items(): - step_keys = [step.key for step in solid_steps] + step_keys = [str(step.key) for step in solid_steps] operator_parameters = DagsterOperatorParameters( recon_repo=recon_repo, diff --git a/python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py b/python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py --- a/python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py @@ -209,7 +209,7 @@ run_id = step_context.pipeline_run.run_id log = step_context.log - step_key = step_run_ref.step_key + step_key = str(step_run_ref.step_key) self._post_artifacts(log, step_run_ref, run_id, step_key) emr_step_def = self._get_emr_step_def(run_id, step_key, step_context.solid.name) diff --git a/python_modules/libraries/dagster-celery-docker/dagster_celery_docker/executor.py b/python_modules/libraries/dagster-celery-docker/dagster_celery_docker/executor.py --- a/python_modules/libraries/dagster-celery-docker/dagster_celery_docker/executor.py +++ b/python_modules/libraries/dagster-celery-docker/dagster_celery_docker/executor.py @@ -180,7 +180,7 @@ execute_step_args = ExecuteStepArgs( pipeline_origin=pipeline_context.pipeline.get_python_origin(), pipeline_run_id=pipeline_context.pipeline_run.run_id, - step_keys_to_execute=[step.key], + step_keys_to_execute=[str(step.key)], instance_ref=pipeline_context.instance.get_ref(), retries_dict=pipeline_context.executor.retries.for_inner_plan().to_config(), ) diff --git a/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/executor.py b/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/executor.py --- a/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/executor.py +++ b/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/executor.py @@ -326,7 +326,7 @@ ] ), CeleryK8sJobExecutor, - step_key=step_key, + step_key=str(step_key), ) if pipeline_run.status != PipelineRunStatus.STARTED: @@ -335,7 +335,7 @@ pipeline_run, EngineEventData([EventMetadataEntry.text(step_key, "Step key"),]), CeleryK8sJobExecutor, - step_key=step_key, + step_key=str(step_key), ) return [] diff --git a/python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py b/python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py --- a/python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py +++ b/python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py @@ -84,7 +84,7 @@ step_key=step_key, ), EngineEventData(marker_end=DELEGATE_MARKER), - step_key=step_key, + step_key=str(step_key), ) except Exception: # pylint: disable=broad-except # We will want to do more to handle the exception here.. maybe subclass Task @@ -127,7 +127,7 @@ step_key=step.key, queue=queue ), EngineEventData(marker_start=DELEGATE_MARKER), - step_key=step.key, + step_key=str(step.key), ) # Get the Celery priority for this step diff --git a/python_modules/libraries/dagster-celery/dagster_celery/executor.py b/python_modules/libraries/dagster-celery/dagster_celery/executor.py --- a/python_modules/libraries/dagster-celery/dagster_celery/executor.py +++ b/python_modules/libraries/dagster-celery/dagster_celery/executor.py @@ -103,7 +103,7 @@ execute_step_args = ExecuteStepArgs( pipeline_origin=pipeline_context.pipeline.get_python_origin(), pipeline_run_id=pipeline_context.pipeline_run.run_id, - step_keys_to_execute=[step.key], + step_keys_to_execute=[str(step.key)], instance_ref=pipeline_context.instance.get_ref(), retries_dict=pipeline_context.executor.retries.for_inner_plan().to_config(), ) diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py b/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py --- a/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py @@ -115,7 +115,7 @@ run_id = step_context.pipeline_run.run_id log = step_context.log - step_key = step_run_ref.step_key + step_key = str(step_run_ref.step_key) self._upload_artifacts(log, step_run_ref, run_id, step_key) task = self._get_databricks_task(run_id, step_key)