diff --git a/examples/asset_store/pickled_object.py b/examples/asset_store/pickled_object.py --- a/examples/asset_store/pickled_object.py +++ b/examples/asset_store/pickled_object.py @@ -1,12 +1,15 @@ from dagster import ( + DagsterInstance, ModeDefinition, OutputDefinition, PresetDefinition, execute_pipeline, pipeline, + reexecute_pipeline, solid, ) from dagster.core.definitions.utils import struct_to_string +from dagster.core.storage.address_storage import AddressStorage from dagster.core.storage.asset_store import default_filesystem_asset_store @@ -72,4 +75,25 @@ if __name__ == "__main__": - result = execute_pipeline(model_pipeline, preset="local") + instance = DagsterInstance.ephemeral(address_storage=AddressStorage()) + + result = execute_pipeline(model_pipeline, instance=instance) + + # print("---reexecution 1 ⬇️---") + + re1_result = reexecute_pipeline( + model_pipeline, + result.run_id, + preset="local", + instance=instance, + step_selection=["parse_df.compute"], + ) + # print("---reexecution 2 ⬇️---") + + re2_result = reexecute_pipeline( + model_pipeline, + re1_result.run_id, + preset="local", + instance=instance, + step_selection=["parse_df.compute"], + ) diff --git a/examples/asset_store/tests/test_asset_store.py b/examples/asset_store/tests/test_asset_store.py --- a/examples/asset_store/tests/test_asset_store.py +++ b/examples/asset_store/tests/test_asset_store.py @@ -1,4 +1,6 @@ -from dagster import execute_pipeline, seven +from dagster import DagsterInstance, execute_pipeline, reexecute_pipeline, seven +from dagster.core.execution.plan.objects import StepOutputHandle +from dagster.core.storage.address_storage import AddressStorage from ..pickled_object import model_pipeline @@ -6,10 +8,39 @@ def test_pickled_object(): with seven.TemporaryDirectory() as tmpdir_path: + instance = DagsterInstance.ephemeral(address_storage=AddressStorage()) run_config = { "resources": {"default_fs_asset_store": {"config": {"base_dir": tmpdir_path}}}, + "storage": {"filesystem": {}}, } - result = execute_pipeline(model_pipeline, run_config=run_config, mode="test") + result = execute_pipeline( + model_pipeline, run_config=run_config, mode="test", instance=instance + ) assert result.success + assert len(instance.address_storage.mapping.keys()) == 3 + _step_output_handle = StepOutputHandle("call_api.compute", "result") + _address = instance.address_storage.mapping[_step_output_handle][0] + + re1_result = reexecute_pipeline( + model_pipeline, + result.run_id, + run_config=run_config, + mode="test", + instance=instance, + step_selection=["parse_df.compute"], + ) + assert re1_result.success + assert instance.address_storage.mapping[_step_output_handle][0] == _address + + re2_result = reexecute_pipeline( + model_pipeline, + re1_result.run_id, + run_config=run_config, + mode="test", + instance=instance, + step_selection=["parse_df.compute"], + ) + assert re2_result.success + assert instance.address_storage.mapping[_step_output_handle][0] == _address diff --git a/python_modules/dagster/dagster/core/execution/plan/execute_step.py b/python_modules/dagster/dagster/core/execution/plan/execute_step.py --- a/python_modules/dagster/dagster/core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/core/execution/plan/execute_step.py @@ -8,11 +8,7 @@ RetryRequested, TypeCheck, ) -from dagster.core.definitions.events import ( - AddressableAssetOperation, - AddressableAssetOperationType, - ObjectStoreOperation, -) +from dagster.core.definitions.events import AddressableAssetOperation, ObjectStoreOperation from dagster.core.errors import ( DagsterExecutionStepExecutionError, DagsterInvariantViolationError, @@ -329,38 +325,14 @@ yield evt -def _get_addressable_asset(context, step_output_handle, asset_store_handle): - check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) - - asset_store = context.get_asset_store(asset_store_handle) - obj = asset_store.get_asset(context, asset_store_handle.asset_metadata) - - return AddressableAssetOperation( - AddressableAssetOperationType.GET_ASSET, - None, - step_output_handle, - asset_store_handle, - obj=obj, - ) - - -def _set_addressable_asset(context, step_output_handle, asset_store_handle, value): - check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) - - asset_store = context.get_asset_store(asset_store_handle) - address = asset_store.set_asset(context, value, asset_store_handle.asset_metadata) - check.inst(address, AssetAddress) - - return AddressableAssetOperation( - AddressableAssetOperationType.SET_ASSET, address, step_output_handle, asset_store_handle - ) - - def _set_intermediates(step_context, step_output, step_output_handle, output, version): - if step_output.asset_store_handle: - # use asset_store if it's configured on provided by the user - res = _set_addressable_asset( - step_context, step_output_handle, step_output.asset_store_handle, output.value + if step_output.asset_store_handle and step_context.instance.address_storage: + # use address_storage if it's configured on the instance and asset_store is provided by the user + res = step_context.instance.address_storage.set_addressable_asset( + context=step_context, + asset_store_handle=step_output.asset_store_handle, + step_output_handle=step_output_handle, + value=output.value, ) if isinstance(res, AddressableAssetOperation): @@ -494,9 +466,12 @@ input_value = [] for source_handle in step_input.source_handles: source_asset_store_handle = step_context.get_asset_store_handle(source_handle) - if source_asset_store_handle: - input_value = _get_addressable_asset( - step_context, source_handle, source_asset_store_handle + if ( + step_context.instance.address_storage + and step_context.instance.address_storage.has_addressable_asset(source_handle) + ): + input_value = step_context.instance.address_storage.get_addressable_asset( + context=step_context, step_output_handle=step_input.source_handles[0], ) elif ( source_handle in step_input.addresses @@ -532,9 +507,12 @@ elif step_input.is_from_single_output: source_handle = step_input.source_handles[0] source_asset_store_handle = step_context.get_asset_store_handle(source_handle) - if source_asset_store_handle: - input_value = _get_addressable_asset( - step_context, source_handle, source_asset_store_handle + if ( + step_context.instance.address_storage + and step_context.instance.address_storage.has_addressable_asset(source_handle) + ): + input_value = step_context.instance.address_storage.get_addressable_asset( + context=step_context, step_output_handle=step_input.source_handles[0], ) elif source_handle in step_input.addresses: input_value = step_context.intermediate_storage.get_intermediate_from_address( @@ -543,6 +521,13 @@ step_output_handle=source_handle, address=step_input.addresses[source_handle], ) + elif ( + step_context.instance.address_storage + and step_context.instance.address_storage.has_addressable_asset(source_handle) + ): + input_value = step_context.instance.address_storage.get_addressable_asset( + context=step_context, step_output_handle=step_input.source_handles[0], + ) else: input_value = step_context.intermediate_storage.get_intermediate( context=step_context, diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -199,6 +199,7 @@ keys. ref (Optional[InstanceRef]): Used by internal machinery to pass instances across process boundaries. + address_storage (Optional[AddressStorage]): Used to track addressable assets. """ _PROCESS_TEMPDIR = None @@ -215,6 +216,7 @@ run_launcher=None, settings=None, ref=None, + address_storage=None, ): from dagster.core.storage.compute_log_manager import ComputeLogManager from dagster.core.storage.event_log import EventLogStorage @@ -223,6 +225,7 @@ from dagster.core.storage.schedules import ScheduleStorage from dagster.core.scheduler import Scheduler from dagster.core.launcher import RunLauncher + from dagster.core.storage.address_storage import AddressStorage self._instance_type = check.inst_param(instance_type, "instance_type", InstanceType) self._local_artifact_storage = check.inst_param( @@ -246,10 +249,14 @@ self._subscribers = defaultdict(list) + self._address_storage = check.opt_inst_param( + address_storage, "address_storage", AddressStorage + ) + # ctors @staticmethod - def ephemeral(tempdir=None, preload=None): + def ephemeral(tempdir=None, preload=None, address_storage=None): from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster.core.storage.event_log import InMemoryEventLogStorage from dagster.core.storage.root import LocalArtifactStorage @@ -266,6 +273,8 @@ event_storage=InMemoryEventLogStorage(preload=preload), compute_log_manager=NoOpComputeLogManager(), run_launcher=SyncInMemoryRunLauncher(), + # FIXME: default to enable AddressStore once it's ready to replace intermediate storage + address_storage=address_storage, ) @staticmethod @@ -310,6 +319,7 @@ run_launcher=instance_ref.run_launcher, settings=instance_ref.settings, ref=instance_ref, + address_storage=instance_ref.address_storage, ) # flags @@ -410,6 +420,12 @@ def compute_log_manager(self): return self._compute_log_manager + # address storage + + @property + def address_storage(self): + return self._address_storage + def get_settings(self, settings_key): check.str_param(settings_key, "settings_key") if self._settings and settings_key in self._settings: diff --git a/python_modules/dagster/dagster/core/instance/config.py b/python_modules/dagster/dagster/core/instance/config.py --- a/python_modules/dagster/dagster/core/instance/config.py +++ b/python_modules/dagster/dagster/core/instance/config.py @@ -53,6 +53,7 @@ "schedule_storage": config_field_for_configurable_class(), "scheduler": config_field_for_configurable_class(), "run_launcher": config_field_for_configurable_class(), + "address_storage": config_field_for_configurable_class(), "telemetry": Field({"enabled": Field(Bool, default_value=True, is_required=False)}), "opt_in": Field({"local_servers": Field(Bool, default_value=False, is_required=False)}), } diff --git a/python_modules/dagster/dagster/core/instance/ref.py b/python_modules/dagster/dagster/core/instance/ref.py --- a/python_modules/dagster/dagster/core/instance/ref.py +++ b/python_modules/dagster/dagster/core/instance/ref.py @@ -40,7 +40,7 @@ namedtuple( "_InstanceRef", "local_artifact_storage_data run_storage_data event_storage_data compute_logs_data " - "schedule_storage_data scheduler_data run_launcher_data settings", + "schedule_storage_data scheduler_data run_launcher_data settings address_storage_data", ) ): """Serializable representation of a :py:class:`DagsterInstance`. @@ -58,6 +58,7 @@ scheduler_data, run_launcher_data, settings, + address_storage_data, ): return super(cls, InstanceRef).__new__( cls, @@ -83,6 +84,9 @@ run_launcher_data, "run_launcher_data", ConfigurableClassData ), settings=check.opt_dict_param(settings, "settings"), + address_storage_data=check.opt_inst_param( + address_storage_data, "address_storage_data", ConfigurableClassData + ), ) @staticmethod @@ -150,6 +154,10 @@ ConfigurableClassData("dagster", "DefaultRunLauncher", yaml.dump({}),), ) + address_storage_data = configurable_class_data_or_default( + config_value, "address_storage", None + ) + settings_keys = {"telemetry", "opt_in"} settings = {key: config_value.get(key) for key in settings_keys} @@ -162,6 +170,7 @@ scheduler_data=scheduler_data, run_launcher_data=run_launcher_data, settings=settings, + address_storage_data=address_storage_data, ) @staticmethod @@ -203,5 +212,9 @@ def run_launcher(self): return self.run_launcher_data.rehydrate() if self.run_launcher_data else None + @property + def address_storage(self): + return self.address_storage_data.rehydrate() if self.address_storage_data else None + def to_dict(self): return self._asdict() diff --git a/python_modules/dagster/dagster/core/storage/address_storage.py b/python_modules/dagster/dagster/core/storage/address_storage.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/storage/address_storage.py @@ -0,0 +1,61 @@ +from dagster import check +from dagster.core.definitions.events import AddressableAssetOperation, AddressableAssetOperationType +from dagster.core.execution.plan.objects import StepOutputHandle +from dagster.core.storage.asset_store import AssetAddress, AssetStore, AssetStoreHandle +from dagster.serdes import ConfigurableClass + + +class AddressStorage(ConfigurableClass): + def __init__(self, inst_data=None, mapping=None): + self._inst_data = inst_data + # mapping step output to an address (1:1 currently). + # Note: with versioning, the mapping will potentially be 1:n where we will be able to track + # multiple addresses with the same step output at the instance level. + self.mapping = check.opt_dict_param(mapping, "mapping", key_type=StepOutputHandle) + + @property + def inst_data(self): + return self._inst_data + + @classmethod + def config_type(cls): + return {} + + @staticmethod + def from_config_value(inst_data, config_value): + return AddressStorage(inst_data=inst_data) + + def _get_asset_store(self, context, asset_store_handle): + asset_store = getattr(context.resources, asset_store_handle.asset_store_key) + return check.inst(asset_store, AssetStore) + + def get_addressable_asset(self, context, step_output_handle): + address, asset_store_handle = self.mapping.get(step_output_handle, None) + check.inst(address, AssetAddress) + check.inst(asset_store_handle, AssetStoreHandle) + + asset_store = self._get_asset_store(context, asset_store_handle) + + obj = asset_store.get_asset(context, address.asset_metadata) + return AddressableAssetOperation( + AddressableAssetOperationType.GET_ASSET, + address, + step_output_handle, + asset_store_handle, + obj=obj, + ) + + def set_addressable_asset(self, context, asset_store_handle, step_output_handle, value): + check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) + asset_store = self._get_asset_store(context, asset_store_handle) + address = asset_store.set_asset(context, value, asset_store_handle.asset_metadata) + check.inst(address, AssetAddress) + + self.mapping[step_output_handle] = (address, asset_store_handle) + return AddressableAssetOperation( + AddressableAssetOperationType.SET_ASSET, address, step_output_handle, asset_store_handle + ) + + def has_addressable_asset(self, step_output_handle): + check.inst_param(step_output_handle, "step_output_handle", StepOutputHandle) + return self.mapping.get(step_output_handle) is not None diff --git a/python_modules/dagster/dagster/core/storage/intermediate_storage.py b/python_modules/dagster/dagster/core/storage/intermediate_storage.py --- a/python_modules/dagster/dagster/core/storage/intermediate_storage.py +++ b/python_modules/dagster/dagster/core/storage/intermediate_storage.py @@ -57,6 +57,13 @@ # skip when the source output has asset store configured continue + if ( + context.instance.address_storage + and context.instance.address_storage.has_addressable_asset(source_handle) + ): + # skip when source is trakced in address_storage + continue + if ( source_handle in step_input.addresses and not self.has_intermediate_at_address( @@ -74,6 +81,13 @@ # skip when the source output has asset store configured continue + if ( + context.instance.address_storage + and context.instance.address_storage.has_addressable_asset(source_handle) + ): + # skip when source is trakced in address_storage + continue + if ( source_handle in step_input.addresses and not self.has_intermediate_at_address( diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py --- a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py @@ -12,7 +12,8 @@ from dagster.core.definitions.events import AddressableAssetOperationType from dagster.core.execution.api import create_execution_plan, execute_plan from dagster.core.execution.plan.objects import StepOutputHandle -from dagster.core.storage.asset_store import default_filesystem_asset_store +from dagster.core.storage.address_storage import AddressStorage +from dagster.core.storage.asset_store import AssetStoreHandle, default_filesystem_asset_store def define_asset_pipeline(asset_store, asset_metadata_dict): @@ -124,3 +125,37 @@ assert not evt.is_failure # only the selected step subset was executed assert set([evt.step_key for evt in step_subset_events]) == {"solid_b.compute"} + + +def test_address_storage(): + with seven.TemporaryDirectory() as tmpdir_path: + test_asset_store = default_filesystem_asset_store + # .configured({"base_dir": tmpdir_path}) + test_asset_metadata_dict = { + "solid_a": {"path": os.path.join(tmpdir_path, "a")}, + "solid_b": {"path": os.path.join(tmpdir_path, "b")}, + } + pipeline_def = define_asset_pipeline(test_asset_store, test_asset_metadata_dict,) + + instance = DagsterInstance.ephemeral(address_storage=AddressStorage()) + result = execute_pipeline(pipeline_def, instance=instance) + assert result.success + + assert instance.address_storage + asset_address_a, asset_store_handle_a = instance.address_storage.mapping[ + StepOutputHandle("solid_a.compute", "result") + ] + assert asset_address_a.asset_metadata == test_asset_metadata_dict["solid_a"] + assert asset_store_handle_a == AssetStoreHandle( + asset_store_key="default_fs_asset_store", + asset_metadata=test_asset_metadata_dict["solid_a"], + ) + + asset_address_b, asset_store_handle_b = instance.address_storage.mapping[ + StepOutputHandle("solid_b.compute", "result") + ] + assert asset_address_b.asset_metadata == test_asset_metadata_dict["solid_b"] + assert asset_store_handle_b == AssetStoreHandle( + asset_store_key="default_fs_asset_store", + asset_metadata=test_asset_metadata_dict["solid_b"], + )