diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_compute_logs.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_compute_logs.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_compute_logs.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_compute_logs.py @@ -6,7 +6,7 @@ snapshots = Snapshot() -snapshots['TestComputeLogs.test_compute_logs_subscription_graphql[sqlite_with_default_run_launcher_deployed_grpc_env] 1'] = [ +snapshots['TestComputeLogs.test_compute_logs_subscription_graphql[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = [ { 'computeLogs': { 'data': '''HELLO WORLD @@ -14,10 +14,3 @@ } } ] - -snapshots['TestComputeLogs.test_get_compute_logs_over_graphql[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = { - 'stdout': { - 'data': '''HELLO WORLD -''' - } -} diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py @@ -6,7 +6,7 @@ snapshots = Snapshot() -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[in_memory_instance_in_process_env] 1'] = [ +snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_deployed_grpc_env] 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { @@ -31,7 +31,7 @@ } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[in_memory_instance_in_process_env] 2'] = [ +snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_deployed_grpc_env] 2'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { @@ -56,7 +56,7 @@ } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[in_memory_instance_in_process_env] 3'] = [ +snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_deployed_grpc_env] 3'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { @@ -75,140 +75,142 @@ } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = [ +snapshots['TestExpectations.test_basic_input_output_expectations[in_memory_instance_in_process_env] 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Failure', - 'label': 'always_false', + 'description': None, + 'label': 'some_expectation', 'metadataEntries': [ - { - '__typename': 'EventJsonMetadataEntry', - 'description': None, - 'jsonString': '{"reason": "Relentless pessimism."}', - 'label': 'data' - } ], - 'success': False + 'success': True }, 'level': 'DEBUG', - 'message': 'Failure', + 'message': 'Expectation some_expectation passed', 'runId': '', - 'solidHandleID': 'emit_failed_expectation', - 'stepKey': 'emit_failed_expectation', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' - } -] - -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_managed_grpc_env] 2'] = [ + }, { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Successful', - 'label': 'always_true', + 'description': None, + 'label': 'other_expectation', 'metadataEntries': [ - { - '__typename': 'EventJsonMetadataEntry', - 'description': None, - 'jsonString': '{"reason": "Just because."}', - 'label': 'data' - } ], 'success': True }, 'level': 'DEBUG', - 'message': 'Successful', + 'message': 'Expectation other_expectation passed', 'runId': '', - 'solidHandleID': 'emit_successful_expectation', - 'stepKey': 'emit_successful_expectation', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_default_run_launcher_managed_grpc_env] 3'] = [ +snapshots['TestExpectations.test_basic_input_output_expectations[sqlite_with_default_run_launcher_deployed_grpc_env] 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Successful', - 'label': 'no_metadata', + 'description': None, + 'label': 'some_expectation', 'metadataEntries': [ ], 'success': True }, 'level': 'DEBUG', - 'message': 'Successful', + 'message': 'Expectation some_expectation passed', 'runId': '', - 'solidHandleID': 'emit_successful_expectation_no_metadata', - 'stepKey': 'emit_successful_expectation_no_metadata', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' - } -] - -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_sync_run_launcher_in_process_env] 1'] = [ + }, { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Failure', - 'label': 'always_false', + 'description': None, + 'label': 'other_expectation', 'metadataEntries': [ - { - '__typename': 'EventJsonMetadataEntry', - 'description': None, - 'jsonString': '{"reason": "Relentless pessimism."}', - 'label': 'data' - } ], - 'success': False + 'success': True }, 'level': 'DEBUG', - 'message': 'Failure', + 'message': 'Expectation other_expectation passed', 'runId': '', - 'solidHandleID': 'emit_failed_expectation', - 'stepKey': 'emit_failed_expectation', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_sync_run_launcher_in_process_env] 2'] = [ +snapshots['TestExpectations.test_basic_input_output_expectations[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Successful', - 'label': 'always_true', + 'description': None, + 'label': 'some_expectation', 'metadataEntries': [ - { - '__typename': 'EventJsonMetadataEntry', - 'description': None, - 'jsonString': '{"reason": "Just because."}', - 'label': 'data' - } ], 'success': True }, 'level': 'DEBUG', - 'message': 'Successful', + 'message': 'Expectation some_expectation passed', 'runId': '', - 'solidHandleID': 'emit_successful_expectation', - 'stepKey': 'emit_successful_expectation', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', + 'timestamp': '' + }, + { + '__typename': 'StepExpectationResultEvent', + 'expectationResult': { + 'description': None, + 'label': 'other_expectation', + 'metadataEntries': [ + ], + 'success': True + }, + 'level': 'DEBUG', + 'message': 'Expectation other_expectation passed', + 'runId': '', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' } ] -snapshots['TestExpectations.test_basic_expectations_within_compute_step_events[sqlite_with_sync_run_launcher_in_process_env] 3'] = [ +snapshots['TestExpectations.test_basic_input_output_expectations[sqlite_with_sync_run_launcher_in_process_env] 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { - 'description': 'Successful', - 'label': 'no_metadata', + 'description': None, + 'label': 'some_expectation', 'metadataEntries': [ ], 'success': True }, 'level': 'DEBUG', - 'message': 'Successful', + 'message': 'Expectation some_expectation passed', 'runId': '', - 'solidHandleID': 'emit_successful_expectation_no_metadata', - 'stepKey': 'emit_successful_expectation_no_metadata', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', + 'timestamp': '' + }, + { + '__typename': 'StepExpectationResultEvent', + 'expectationResult': { + 'description': None, + 'label': 'other_expectation', + 'metadataEntries': [ + ], + 'success': True + }, + 'level': 'DEBUG', + 'message': 'Expectation other_expectation passed', + 'runId': '', + 'solidHandleID': 'df_expectations_solid', + 'stepKey': 'df_expectations_solid', 'timestamp': '' } ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_sensors.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_sensors.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_sensors.py @@ -6,7 +6,7 @@ snapshots = Snapshot() -snapshots['TestSensors.test_get_sensor[readonly_sqlite_instance_deployed_grpc_env] 1'] = { +snapshots['TestSensors.test_get_sensor[readonly_in_memory_instance_managed_grpc_env] 1'] = { '__typename': 'Sensor', 'id': 'always_no_config_sensor:no_config_pipeline', 'mode': 'default', @@ -24,7 +24,7 @@ 'solidSelection': None } -snapshots['TestSensors.test_get_sensor[readonly_sqlite_instance_managed_grpc_env] 1'] = { +snapshots['TestSensors.test_get_sensor[readonly_in_memory_instance_multi_location] 1'] = { '__typename': 'Sensor', 'id': 'always_no_config_sensor:no_config_pipeline', 'mode': 'default', @@ -42,7 +42,7 @@ 'solidSelection': None } -snapshots['TestSensors.test_get_sensor[readonly_sqlite_instance_multi_location] 1'] = { +snapshots['TestSensors.test_get_sensor[readonly_sqlite_instance_in_process_env] 1'] = { '__typename': 'Sensor', 'id': 'always_no_config_sensor:no_config_pipeline', 'mode': 'default', @@ -59,192 +59,3 @@ }, 'solidSelection': None } - -snapshots['TestSensors.test_get_sensors[readonly_in_memory_instance_in_process_env] 1'] = [ - { - 'id': 'always_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'always_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'multi_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'multi_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'never_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'never_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'once_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'once_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - } -] - -snapshots['TestSensors.test_get_sensors[readonly_in_memory_instance_managed_grpc_env] 1'] = [ - { - 'id': 'always_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'always_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'multi_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'multi_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'never_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'never_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'once_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'once_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - } -] - -snapshots['TestSensors.test_get_sensors[readonly_in_memory_instance_multi_location] 1'] = [ - { - 'id': 'always_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'always_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'multi_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'multi_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'never_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'never_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - }, - { - 'id': 'once_no_config_sensor:no_config_pipeline', - 'mode': 'default', - 'name': 'once_no_config_sensor', - 'pipelineName': 'no_config_pipeline', - 'sensorState': { - 'runs': [ - ], - 'runsCount': 0, - 'status': 'STOPPED', - 'ticks': [ - ] - }, - 'solidSelection': None - } -] diff --git a/python_modules/dagster/dagster/core/execution/context/system.py b/python_modules/dagster/dagster/core/execution/context/system.py --- a/python_modules/dagster/dagster/core/execution/context/system.py +++ b/python_modules/dagster/dagster/core/execution/context/system.py @@ -296,9 +296,9 @@ # this is re-execution self.pipeline_run.parent_run_id # only part of the pipeline is being re-executed - and self.pipeline_run.step_keys_to_execute + and len(self.execution_plan.step_handles_to_execute) < len(self.execution_plan.steps) # this step is not being executed - and step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute + and step_output_handle.step_key not in self.execution_plan.step_handles_to_execute ): return self.pipeline_run.parent_run_id else: diff --git a/python_modules/dagster/dagster/core/execution/memoization.py b/python_modules/dagster/dagster/core/execution/memoization.py --- a/python_modules/dagster/dagster/core/execution/memoization.py +++ b/python_modules/dagster/dagster/core/execution/memoization.py @@ -1,12 +1,6 @@ -from collections import defaultdict - from dagster import check -from dagster.core.definitions.events import ObjectStoreOperation, ObjectStoreOperationType from dagster.core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError -from dagster.core.events import DagsterEvent, DagsterEventType -from dagster.core.events.log import EventRecord from dagster.core.execution.context.system import SystemExecutionContext -from dagster.core.execution.plan.outputs import StepOutputHandle from dagster.core.execution.plan.plan import ExecutionPlan @@ -40,108 +34,3 @@ pipeline_context.intermediate_storage.__class__.__name__ ) ) - - -def copy_required_intermediates_for_execution(pipeline_context, execution_plan): - """ - Uses the intermediates manager to copy intermediates from the previous run that apply to the - current execution plan, and yields the corresponding events - """ - check.inst_param(pipeline_context, "pipeline_context", SystemExecutionContext) - check.inst_param(execution_plan, "execution_plan", ExecutionPlan) - parent_run_id = pipeline_context.pipeline_run.parent_run_id - - if not parent_run_id: - return - - parent_run_logs = pipeline_context.instance.all_logs(parent_run_id) - - output_handles_for_current_run = output_handles_from_execution_plan(execution_plan) - output_handles_from_previous_run = output_handles_from_event_logs(parent_run_logs) - output_handles_to_copy = [ - step_output_handle - for step_output_handle in output_handles_for_current_run.intersection( - output_handles_from_previous_run - ) - if not pipeline_context.using_io_manager(step_output_handle) - ] - output_handles_to_copy_by_step = defaultdict(list) - for handle in output_handles_to_copy: - output_handles_to_copy_by_step[handle.step_key].append(handle) - - intermediate_storage = pipeline_context.intermediate_storage - for step in execution_plan.get_all_steps_in_topo_order(): - handles_to_copy = output_handles_to_copy_by_step.get(step.key, []) - - # exit early to avoid trying to make a context from an UnresolvedExecutionStep - if not handles_to_copy: - continue - - step_context = pipeline_context.for_step(step) - for handle in handles_to_copy: - if intermediate_storage.has_intermediate(pipeline_context, handle): - continue - - operation = intermediate_storage.copy_intermediate_from_run( - pipeline_context, parent_run_id, handle - ) - yield DagsterEvent.object_store_operation( - step_context, - ObjectStoreOperation.serializable(operation, value_name=handle.output_name), - ) - - -def step_output_handle_from_storage_event(record): - """ - If the record is a storage event, returns the StepOutputHandle that was stored for. Otherwise, - returns None. - """ - check.inst_param(record, "record", EventRecord) - if not record.is_dagster_event: - return None - - event = record.dagster_event - - write_ops = ( - ObjectStoreOperationType.SET_OBJECT.value, - ObjectStoreOperationType.CP_OBJECT.value, - ) - if ( - event.event_type_value == DagsterEventType.OBJECT_STORE_OPERATION.value - and event.event_specific_data.op in write_ops - ): - return StepOutputHandle(event.step_key, event.event_specific_data.value_name) - - if event.event_type == DagsterEventType.HANDLED_OUTPUT: - return StepOutputHandle(event.step_key, event.event_specific_data.output_name) - - return None - - -def output_handles_from_event_logs(event_logs): - output_handles_from_previous_run = set() - failed_step_keys = set( - record.dagster_event.step_key - for record in event_logs - if record.dagster_event_type == DagsterEventType.STEP_FAILURE - ) - - for record in event_logs: - step_output_handle = step_output_handle_from_storage_event(record) - if step_output_handle and step_output_handle.step_key not in failed_step_keys: - output_handles_from_previous_run.add(step_output_handle) - - return output_handles_from_previous_run - - -def output_handles_from_execution_plan(execution_plan): - output_handles_for_current_run = set() - for step_level in execution_plan.get_steps_to_execute_by_level(): - for step in step_level: - for step_input in step.step_inputs: - for step_output_handle in step_input.get_step_output_handle_dependencies(): - # Only include handles that won't be satisfied by steps included in this - # execution. - if step_output_handle.step_key not in execution_plan.step_keys_to_execute: - output_handles_for_current_run.add(step_output_handle) - return output_handles_for_current_run diff --git a/python_modules/dagster/dagster/core/execution/plan/execute_plan.py b/python_modules/dagster/dagster/core/execution/plan/execute_plan.py --- a/python_modules/dagster/dagster/core/execution/plan/execute_plan.py +++ b/python_modules/dagster/dagster/core/execution/plan/execute_plan.py @@ -11,7 +11,6 @@ ) from dagster.core.events import DagsterEvent from dagster.core.execution.context.system import SystemExecutionContext, SystemStepExecutionContext -from dagster.core.execution.memoization import copy_required_intermediates_for_execution from dagster.core.execution.plan.execute_step import core_dagster_event_sequence_for_step from dagster.core.execution.plan.objects import StepFailureData, StepRetryData, UserFailureData from dagster.core.execution.plan.plan import ExecutionPlan @@ -25,8 +24,6 @@ retries = pipeline_context.retries - yield from copy_required_intermediates_for_execution(pipeline_context, execution_plan) - with execution_plan.start(retries=retries) as active_execution: # It would be good to implement a reference tracking algorithm here to diff --git a/python_modules/dagster/dagster/core/storage/intermediate_storage.py b/python_modules/dagster/dagster/core/storage/intermediate_storage.py --- a/python_modules/dagster/dagster/core/storage/intermediate_storage.py +++ b/python_modules/dagster/dagster/core/storage/intermediate_storage.py @@ -86,6 +86,23 @@ context.upstream_output.name, context.upstream_output.mapping_key, ) + + # backcompat behavior: copy intermediate from parent run to the current run destination + if ( + context.upstream_output + and context.upstream_output.run_id == step_context.pipeline_run.parent_run_id + ): + if not self.intermediate_storage.has_intermediate(step_context, source_handle): + operation = self.intermediate_storage.copy_intermediate_from_run( + step_context, step_context.pipeline_run.parent_run_id, source_handle + ) + + context.log.debug( + "Copied object for input {input_name} from {key} to {dest_key}".format( + input_name=context.name, key=operation.key, dest_key=operation.dest_key + ) + ) + if not self.intermediate_storage.has_intermediate(step_context, source_handle): raise DagsterStepOutputNotFoundError( ( diff --git a/python_modules/dagster/dagster/core/storage/object_store.py b/python_modules/dagster/dagster/core/storage/object_store.py --- a/python_modules/dagster/dagster/core/storage/object_store.py +++ b/python_modules/dagster/dagster/core/storage/object_store.py @@ -164,7 +164,6 @@ # Ensure output path exists mkdir_p(os.path.dirname(dst)) - if os.path.isfile(src): shutil.copy(src, dst) elif os.path.isdir(src): diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoization.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoization.py deleted file mode 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoization.py +++ /dev/null @@ -1,38 +0,0 @@ -from dagster import pipeline, solid -from dagster.core.execution.api import create_execution_plan -from dagster.core.execution.memoization import output_handles_from_execution_plan -from dagster.core.execution.plan.outputs import StepOutputHandle - - -def define_pipeline(): - @solid - def add_one(_context, num): - return num + 1 - - @solid - def add_two(_context, num): - return num + 2 - - @solid - def add_three(_context, num): - return num + 3 - - @pipeline - def addy_pipeline(): - add_three(add_two(add_one())) - - return addy_pipeline - - -def test_output_handles_from_execution_plan(): - execution_plan = create_execution_plan( - define_pipeline(), run_config={"solids": {"add_one": {"inputs": {"num": {"value": 3}}}}}, - ) - - assert output_handles_from_execution_plan(execution_plan) == set() - assert output_handles_from_execution_plan( - execution_plan.build_subset_plan(["add_two", "add_three"]) - ) == {StepOutputHandle("add_one", "result")} - assert output_handles_from_execution_plan(execution_plan.build_subset_plan(["add_three"])) == { - StepOutputHandle("add_two", "result") - } diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_io_manager_backcompat.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_io_manager_backcompat.py --- a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_io_manager_backcompat.py +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_io_manager_backcompat.py @@ -3,7 +3,8 @@ import pytest from dagster.core.definitions import intermediate_storage, pipeline, solid from dagster.core.definitions.mode import ModeDefinition -from dagster.core.execution.api import execute_pipeline +from dagster.core.execution.api import execute_pipeline, reexecute_pipeline +from dagster.core.instance import DagsterInstance from dagster.core.storage.object_store import InMemoryObjectStore from dagster.core.storage.system_storage import ( build_intermediate_storage_from_object_store, @@ -63,3 +64,36 @@ ), ): execute_pipeline(foo, run_config={"intermediate_storage": {"filesystem": {}}}) + + +def test_intermediate_storage_reexecution(): + @solid + def return_one(_): + return 1 + + @solid + def plus_one(_, one): + return one + 1 + + @pipeline + def foo(): + plus_one(return_one()) + + run_config = {"intermediate_storage": {"filesystem": {}}} + + instance = DagsterInstance.ephemeral() + result = execute_pipeline(foo, run_config=run_config, instance=instance) + assert result.success + reexecution_result = reexecute_pipeline( + foo, run_config=run_config, parent_run_id=result.run_id, instance=instance + ) + assert reexecution_result.success + + partial_reexecution_result = reexecute_pipeline( + foo, + run_config=run_config, + step_selection=["plus_one"], + parent_run_id=result.run_id, + instance=instance, + ) + assert partial_reexecution_result.success