diff --git a/examples/asset_store/.gitignore b/examples/asset_store/.gitignore new file mode 100644 --- /dev/null +++ b/examples/asset_store/.gitignore @@ -0,0 +1 @@ +uncommitted/ diff --git a/examples/asset_store/README.md b/examples/asset_store/README.md new file mode 100644 diff --git a/examples/asset_store/__init__.py b/examples/asset_store/__init__.py new file mode 100644 diff --git a/examples/asset_store/pickled_object.py b/examples/asset_store/pickled_object.py new file mode 100644 --- /dev/null +++ b/examples/asset_store/pickled_object.py @@ -0,0 +1,75 @@ +from dagster import ( + ModeDefinition, + OutputDefinition, + PresetDefinition, + execute_pipeline, + pipeline, + solid, +) +from dagster.core.definitions.utils import struct_to_string +from dagster.core.storage.asset_store import default_filesystem_asset_store + + +def train(df): + return len(df) + + +local_asset_store = default_filesystem_asset_store.configured( + {"base_dir": "uncommitted/intermediates/"} +) + + +@solid( + output_defs=[ + OutputDefinition( + asset_store_key="default_fs_asset_store", asset_metadata={"path": "rawdata"} + ) + ], +) +def call_api(_): + return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + +@solid( + output_defs=[ + OutputDefinition( + asset_store_key="default_fs_asset_store", asset_metadata={"path": "parse_df"} + ) + ], +) +def parse_df(context, df): + context.log.info(struct_to_string(df)) + result_df = df[:5] + return result_df + + +@solid( + output_defs=[ + OutputDefinition( + asset_store_key="default_fs_asset_store", asset_metadata={"path": "model_result"}, + ) + ], +) +def train_model(context, df): + context.log.info(struct_to_string(df)) + model = train(df) + return model + + +@pipeline( + mode_defs=[ + ModeDefinition( + "test", resource_defs={"default_fs_asset_store": default_filesystem_asset_store} + ), + ModeDefinition("local", resource_defs={"default_fs_asset_store": local_asset_store}), + ], + preset_defs=[ + PresetDefinition("local", run_config={"storage": {"filesystem": {}}}, mode="local"), + ], +) +def model_pipeline(): + train_model(parse_df(call_api())) + + +if __name__ == "__main__": + result = execute_pipeline(model_pipeline, preset="local") diff --git a/examples/asset_store/requirements.txt b/examples/asset_store/requirements.txt new file mode 100644 --- /dev/null +++ b/examples/asset_store/requirements.txt @@ -0,0 +1,2 @@ +dagster +dagit \ No newline at end of file diff --git a/examples/asset_store/tests/__init__.py b/examples/asset_store/tests/__init__.py new file mode 100644 diff --git a/examples/asset_store/tests/test_asset_store.py b/examples/asset_store/tests/test_asset_store.py new file mode 100644 --- /dev/null +++ b/examples/asset_store/tests/test_asset_store.py @@ -0,0 +1,15 @@ +from dagster import execute_pipeline, seven + +from ..pickled_object import model_pipeline + + +def test_pickled_object(): + with seven.TemporaryDirectory() as tmpdir_path: + + run_config = { + "resources": {"default_fs_asset_store": {"config": {"base_dir": tmpdir_path}}}, + } + + result = execute_pipeline(model_pipeline, run_config=run_config, mode="test") + + assert result.success diff --git a/examples/asset_store/tox.ini b/examples/asset_store/tox.ini new file mode 100644 --- /dev/null +++ b/examples/asset_store/tox.ini @@ -0,0 +1,22 @@ +[tox] +envlist = py{38,37,36,27}-{unix,windows},pylint +skipsdist = True + +[testenv] +passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE +deps = + -r ../../python_modules/dagster/dev-requirements.txt + -e ../../python_modules/dagster +whitelist_externals = + /bin/bash + echo +commands = + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' + echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" + pytest -vv + +[testenv:pylint] +basepython = + python3.7 +commands = + /bin/bash -c 'cd .. && pylint -j 0 --rcfile=../.pylintrc asset_store/' diff --git a/examples/asset_store/workspace.yaml b/examples/asset_store/workspace.yaml new file mode 100644 --- /dev/null +++ b/examples/asset_store/workspace.yaml @@ -0,0 +1,2 @@ +load_from: + - python_file: pickled_object.py diff --git a/python_modules/dagster/dagster/core/definitions/events.py b/python_modules/dagster/dagster/core/definitions/events.py --- a/python_modules/dagster/dagster/core/definitions/events.py +++ b/python_modules/dagster/dagster/core/definitions/events.py @@ -6,7 +6,7 @@ from dagster import check from dagster.core.errors import DagsterInvalidAssetKey -from dagster.serdes import Persistable, whitelist_for_persistence +from dagster.serdes import Persistable, whitelist_for_persistence, whitelist_for_serdes from dagster.utils import is_str from dagster.utils.backcompat import experimental_arg_warning @@ -628,6 +628,40 @@ self.seconds_to_wait = check.opt_int_param(seconds_to_wait, "seconds_to_wait") +@whitelist_for_serdes +class AddressableAssetOperationType(Enum): + SET_ASSET = "SET_ASSET" + GET_ASSET = "GET_ASSET" + + +@whitelist_for_serdes +class AddressableAssetOperation( + namedtuple( + "_AddressableAssetOperation", "op address step_output_handle asset_store_handle obj", + ) +): + """ + Event related addressableAssets + """ + + def __new__(cls, op, address, step_output_handle, asset_store_handle, obj=None): + from dagster.core.execution.plan.objects import StepOutputHandle + from dagster.core.storage.asset_store import AssetAddress, AssetStoreHandle + + return super(AddressableAssetOperation, cls).__new__( + cls, + op=op, + address=check.opt_inst_param(address, "address", AssetAddress), + step_output_handle=check.inst_param( + step_output_handle, "step_output_handle", StepOutputHandle + ), + asset_store_handle=check.inst_param( + asset_store_handle, "asset_store_handle", AssetStoreHandle + ), + obj=obj, + ) + + class ObjectStoreOperationType(Enum): SET_OBJECT = "SET_OBJECT" GET_OBJECT = "GET_OBJECT" diff --git a/python_modules/dagster/dagster/core/definitions/output.py b/python_modules/dagster/dagster/core/definitions/output.py --- a/python_modules/dagster/dagster/core/definitions/output.py +++ b/python_modules/dagster/dagster/core/definitions/output.py @@ -1,6 +1,7 @@ from collections import namedtuple from dagster import check +from dagster.core.storage.asset_store import AssetStoreHandle from dagster.core.types.dagster_type import resolve_dagster_type from .utils import DEFAULT_OUTPUT, check_valid_name @@ -24,13 +25,26 @@ name (Optional[str]): Name of the output. (default: "result") description (Optional[str]): Human-readable description of the output. is_required (Optional[bool]): Whether the presence of this field is required. (default: True) + asset_store_key (Optional[str]) + asset_metadata (Optional[Dict[str, Any]]) """ - def __init__(self, dagster_type=None, name=None, description=None, is_required=None): + def __init__( + self, + dagster_type=None, + name=None, + description=None, + is_required=None, + asset_store_key=None, + asset_metadata=None, + ): self._name = check_valid_name(check.opt_str_param(name, "name", DEFAULT_OUTPUT)) self._dagster_type = resolve_dagster_type(dagster_type) self._description = check.opt_str_param(description, "description") self._is_required = check.opt_bool_param(is_required, "is_required", default=True) + self._asset_store_handle = ( + AssetStoreHandle(asset_store_key, asset_metadata) if asset_store_key else None + ) @property def name(self): @@ -52,6 +66,10 @@ def is_required(self): return self._is_required + @property + def asset_store_handle(self): + return self._asset_store_handle + def mapping_from(self, solid_name, output_name=None): """Create an output mapping from an output of a child solid. diff --git a/python_modules/dagster/dagster/core/events/__init__.py b/python_modules/dagster/dagster/core/events/__init__.py --- a/python_modules/dagster/dagster/core/events/__init__.py +++ b/python_modules/dagster/dagster/core/events/__init__.py @@ -49,6 +49,7 @@ PIPELINE_FAILURE = "PIPELINE_FAILURE" OBJECT_STORE_OPERATION = "OBJECT_STORE_OPERATION" + ADDRESSABLE_ASSET_OPERATION = "ADDRESSABLE_ASSET_OPERATION" ENGINE_EVENT = "ENGINE_EVENT" @@ -355,6 +356,10 @@ def is_engine_event(self): return self.event_type == DagsterEventType.ENGINE_EVENT + @property + def is_addressable_asset_operation(self): + return self.event_type == DagsterEventType.ADDRESSABLE_ASSET_OPERATION + @property def asset_key(self): if self.event_type != DagsterEventType.STEP_MATERIALIZATION: @@ -794,6 +799,28 @@ message=message, ) + @staticmethod + def addressable_asset_operation(step_context, addressable_asset_operation): + from dagster.core.definitions.events import AddressableAssetOperation + + check.inst_param( + addressable_asset_operation, "addressable_asset_operation", AddressableAssetOperation + ) + return DagsterEvent.from_step( + event_type=DagsterEventType.ADDRESSABLE_ASSET_OPERATION, + step_context=step_context, + event_specific_data=AddressableAssetOperationData( + addressable_asset_operation.op, + addressable_asset_operation.address, + addressable_asset_operation.step_output_handle, + addressable_asset_operation.asset_store_handle, + ), + message="addressable asset operation: {op} for {step_output_handle}.".format( + op=addressable_asset_operation.op.value, + step_output_handle=addressable_asset_operation.step_output_handle, + ), + ) + @staticmethod def hook_completed(hook_context, hook_def): event_type = DagsterEventType.HOOK_COMPLETED @@ -892,6 +919,13 @@ pass +@whitelist_for_serdes +class AddressableAssetOperationData( + namedtuple("_AddressableAssetOperationData", "op address step_output_handle asset_store") +): + pass + + @whitelist_for_serdes class ObjectStoreOperationResultData( namedtuple("_ObjectStoreOperationResultData", "op value_name metadata_entries address version") diff --git a/python_modules/dagster/dagster/core/execution/context/system.py b/python_modules/dagster/dagster/core/execution/context/system.py --- a/python_modules/dagster/dagster/core/execution/context/system.py +++ b/python_modules/dagster/dagster/core/execution/context/system.py @@ -17,6 +17,7 @@ from dagster.core.execution.retries import Retries from dagster.core.executor.base import Executor from dagster.core.log_manager import DagsterLogManager +from dagster.core.storage.asset_store import AssetStore from dagster.core.storage.file_manager import FileManager from dagster.core.storage.pipeline_run import PipelineRun from dagster.core.system_config.objects import EnvironmentConfig @@ -286,6 +287,22 @@ def for_hook(self, hook_def): return HookContext(self._execution_context_data, self.log, hook_def, self.step) + def get_asset_store_handle(self, step_output_handle): + # get AssetStoreHandle from static execution plan + from dagster.core.execution.plan.objects import StepOutputHandle + from dagster.core.execution.plan.objects import StepOutput + + check.inst_param(step_output_handle, "step_output_handle", StepOutputHandle) + step = self.execution_plan.get_step_by_key(step_output_handle.step_key) + step_output = step.step_output_named(step_output_handle.output_name) + check.inst(step_output, StepOutput) + return step_output.asset_store_handle + + def get_asset_store(self, asset_store_handle): + # get AssetStore from resources + asset_store = getattr(self.resources, asset_store_handle.asset_store_key) + return check.inst(asset_store, AssetStore) + class SystemComputeExecutionContext(SystemStepExecutionContext): @property diff --git a/python_modules/dagster/dagster/core/execution/plan/compute.py b/python_modules/dagster/dagster/core/execution/plan/compute.py --- a/python_modules/dagster/dagster/core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/core/execution/plan/compute.py @@ -39,6 +39,7 @@ dagster_type=output_def.dagster_type, optional=output_def.optional, should_materialize=name in config_output_names, + asset_store_handle=output_def.asset_store_handle, ) for name, output_def in solid.definition.output_dict.items() ], diff --git a/python_modules/dagster/dagster/core/execution/plan/execute_step.py b/python_modules/dagster/dagster/core/execution/plan/execute_step.py --- a/python_modules/dagster/dagster/core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/core/execution/plan/execute_step.py @@ -8,7 +8,11 @@ RetryRequested, TypeCheck, ) -from dagster.core.definitions.events import ObjectStoreOperation +from dagster.core.definitions.events import ( + AddressableAssetOperation, + AddressableAssetOperationType, + ObjectStoreOperation, +) from dagster.core.errors import ( DagsterExecutionStepExecutionError, DagsterInvariantViolationError, @@ -29,6 +33,7 @@ TypeCheckData, ) from dagster.core.execution.resolve_versions import resolve_step_output_versions +from dagster.core.storage.asset_store import AssetAddress, AssetStoreHandle from dagster.core.types.dagster_type import DagsterTypeKind from dagster.utils import iterate_with_context, raise_interrupts_immediately from dagster.utils.timing import time_execution_scope @@ -252,10 +257,16 @@ inputs[input_name] = input_value.obj elif isinstance(input_value, MultipleStepOutputsListWrapper): for op in input_value: - yield DagsterEvent.object_store_operation( - step_context, ObjectStoreOperation.serializable(op, value_name=input_name) - ) + if isinstance(input_value, ObjectStoreOperation): + yield DagsterEvent.object_store_operation( + step_context, ObjectStoreOperation.serializable(op, value_name=input_name) + ) + elif isinstance(input_value, AddressableAssetOperation): + yield DagsterEvent.addressable_asset_operation(step_context, input_value) inputs[input_name] = [op.obj for op in input_value] + elif isinstance(input_value, AddressableAssetOperation): + yield DagsterEvent.addressable_asset_operation(step_context, input_value) + inputs[input_name] = input_value.obj else: inputs[input_name] = input_value @@ -318,20 +329,56 @@ yield evt -def _set_intermediates(step_context, step_output, step_output_handle, output, version): - res = step_context.intermediate_storage.set_intermediate( - context=step_context, - dagster_type=step_output.dagster_type, - step_output_handle=step_output_handle, - value=output.value, - version=version, +def _get_addressable_asset(context, step_output_handle, asset_store_handle): + check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) + + asset_store = context.get_asset_store(asset_store_handle) + obj = asset_store.get_asset(context, asset_store_handle.asset_metadata) + + return AddressableAssetOperation( + AddressableAssetOperationType.GET_ASSET, + None, + step_output_handle, + asset_store_handle, + obj=obj, + ) + + +def _set_addressable_asset(context, step_output_handle, asset_store_handle, value): + check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) + + asset_store = context.get_asset_store(asset_store_handle) + address = asset_store.set_asset(context, value, asset_store_handle.asset_metadata) + check.inst(address, AssetAddress) + + return AddressableAssetOperation( + AddressableAssetOperationType.SET_ASSET, address, step_output_handle, asset_store_handle ) - if isinstance(res, ObjectStoreOperation): - yield DagsterEvent.object_store_operation( - step_context, ObjectStoreOperation.serializable(res, value_name=output.output_name), + +def _set_intermediates(step_context, step_output, step_output_handle, output, version): + if step_output.asset_store_handle: + # use asset_store if it's configured on provided by the user + res = _set_addressable_asset( + step_context, step_output_handle, step_output.asset_store_handle, output.value ) + if isinstance(res, AddressableAssetOperation): + yield DagsterEvent.addressable_asset_operation(step_context, res) + else: + res = step_context.intermediate_storage.set_intermediate( + context=step_context, + dagster_type=step_output.dagster_type, + step_output_handle=step_output_handle, + value=output.value, + version=version, + ) + + if isinstance(res, ObjectStoreOperation): + yield DagsterEvent.object_store_operation( + step_context, ObjectStoreOperation.serializable(res, value_name=output.output_name), + ) + def _create_output_materializations(step_context, output_name, value): step = step_context.step @@ -446,7 +493,12 @@ input_value = [] for source_handle in step_input.source_handles: - if ( + source_asset_store_handle = step_context.get_asset_store_handle(source_handle) + if source_asset_store_handle: + input_value = _get_addressable_asset( + step_context, source_handle, source_asset_store_handle + ) + elif ( source_handle in step_input.addresses and step_context.intermediate_storage.has_intermediate_at_address( step_input.addresses[source_handle] @@ -479,7 +531,12 @@ elif step_input.is_from_single_output: source_handle = step_input.source_handles[0] - if source_handle in step_input.addresses: + source_asset_store_handle = step_context.get_asset_store_handle(source_handle) + if source_asset_store_handle: + input_value = _get_addressable_asset( + step_context, source_handle, source_asset_store_handle + ) + elif source_handle in step_input.addresses: input_value = step_context.intermediate_storage.get_intermediate_from_address( step_context, dagster_type=step_input.dagster_type, diff --git a/python_modules/dagster/dagster/core/execution/plan/objects.py b/python_modules/dagster/dagster/core/execution/plan/objects.py --- a/python_modules/dagster/dagster/core/execution/plan/objects.py +++ b/python_modules/dagster/dagster/core/execution/plan/objects.py @@ -4,6 +4,7 @@ from dagster import check from dagster.core.definitions import AssetMaterialization, Materialization, Solid, SolidHandle from dagster.core.definitions.events import EventMetadataEntry +from dagster.core.storage.asset_store import AssetStoreHandle from dagster.core.types.dagster_type import DagsterType from dagster.serdes import whitelist_for_serdes from dagster.utils import merge_dicts @@ -197,9 +198,16 @@ return {handle.step_key for handle in self.source_handles} -class StepOutput(namedtuple("_StepOutput", "name dagster_type optional should_materialize")): +class StepOutput( + namedtuple("_StepOutput", "name dagster_type optional should_materialize asset_store_handle") +): def __new__( - cls, name, dagster_type=None, optional=None, should_materialize=None, + cls, + name, + dagster_type=None, + optional=None, + should_materialize=None, + asset_store_handle=None, ): return super(StepOutput, cls).__new__( cls, @@ -207,6 +215,9 @@ optional=check.bool_param(optional, "optional"), should_materialize=check.bool_param(should_materialize, "should_materialize"), dagster_type=check.inst_param(dagster_type, "dagster_type", DagsterType), + asset_store_handle=check.opt_inst_param( + asset_store_handle, "asset_store_handle", AssetStoreHandle + ), ) diff --git a/python_modules/dagster/dagster/core/execution/resources_init.py b/python_modules/dagster/dagster/core/execution/resources_init.py --- a/python_modules/dagster/dagster/core/execution/resources_init.py +++ b/python_modules/dagster/dagster/core/execution/resources_init.py @@ -236,6 +236,8 @@ resource_keys = resource_keys.union( step_output.dagster_type.materializer.required_resource_keys() ) + if step_output.asset_store_handle: + resource_keys = resource_keys.union({step_output.asset_store_handle.asset_store_key}) # add all the storage-compatible plugin resource keys for dagster_type in solid_def.all_dagster_types(): diff --git a/python_modules/dagster/dagster/core/storage/asset_store.py b/python_modules/dagster/dagster/core/storage/asset_store.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/storage/asset_store.py @@ -0,0 +1,144 @@ +import os +import pickle +import uuid +from abc import ABCMeta, abstractmethod +from collections import namedtuple + +import six + +from dagster import check +from dagster.config import Field +from dagster.config.source import StringSource +from dagster.core.definitions.resource import resource +from dagster.serdes import ( + deserialize_json_to_dagster_namedtuple, + serialize_dagster_namedtuple, + whitelist_for_serdes, +) +from dagster.utils import PICKLE_PROTOCOL + + +@whitelist_for_serdes +class AssetStoreHandle(namedtuple("_AssetStoreHandle", "asset_store_key asset_metadata")): + def __new__(cls, asset_store_key, asset_metadata=None): + return super(AssetStoreHandle, cls).__new__( + cls, + asset_store_key=check.str_param(asset_store_key, "asset_store_key"), + asset_metadata=check.opt_dict_param(asset_metadata, "asset_metadata", key_type=str), + ) + + +class AssetAddress: + """ + Base class for addressable asset pointer. + + Extend this class to represent the metadata of an addressable asset paried to a user-provided + subclass of :py:class:`~dagster.AssetStore`. + """ + + +class AssetStore(six.with_metaclass(ABCMeta)): # pylint: disable=no-init + """ + Base class for user-provided asset store. + + Extend this class to handle asset operations. Users should implement ``set_asset`` to write a + data object that can be tracked by the Dagster machinery and ``get_asset`` to read an existing + data object. + """ + + @abstractmethod + def set_asset(self, context, obj, asset_metadata): + """The user-definied write method that stores a data object. + + Note: this method should return a subclass of :py:class:`~dagster.AssetAddress` for the + Dagster machinery to track the stored data object, i.e. an asset. The scheme of the asset + address typically should line up with the user-provided asset store. + + Args: + context (SystemStepExecutionContext): The context that the corresponding step is in. + obj (Any): The data object to be stored. + asset_metadata (Dict[str, Any]): The metadata defined on the step that produced the + given data object. For example, users can provide a file path if the data object + will be stored in a filesystem, or provide information of a database table when it + is going to load the data into the table. + + + Returns: + AssetAddress: a pointer to the stored data. + """ + + @abstractmethod + def get_asset(self, context, asset_metadata): + """The user-defined read method that loads data given a tracked address. + + Args: + context (SystemStepExecutionContext): The context that the corresponding step is in. + asset_metadata (Dict[str, Any]): The metadata defined on the step that produced the + data object to get. + + Returns: + Any: The data object. + """ + + +@whitelist_for_serdes +class PickledObjectFileystemAssetAddress( + namedtuple("_PickledObjectFileystemAssetAddress", "asset_id filepath"), AssetAddress +): + def __new__( + cls, asset_id, filepath, + ): + return super(PickledObjectFileystemAssetAddress, cls).__new__( + cls, + asset_id=check.str_param(asset_id, "asset_id"), + filepath=check.str_param(filepath, "filepath"), + ) + + def to_string(self): + return serialize_dagster_namedtuple(self) + + @staticmethod + def from_string(json_str): + return deserialize_json_to_dagster_namedtuple(json_str) + + +class PickledObjectFileystemAssetStore(AssetStore): + def __init__(self, base_dir=None): + self.base_dir = check.opt_str_param(base_dir, "base_dir") + self.write_mode = "wb" + self.read_mode = "rb" + + def _get_path(self, path): + return os.path.join(self.base_dir, path) + + def _get_new_id(self): + return str(uuid.uuid4()) + + def set_asset(self, _context, obj, asset_metadata): + """ + store data object to file and track it as AddressablAsset + """ + check.dict_param(asset_metadata, "asset_metadata", key_type=str) + path = check.str_param(asset_metadata.get("path"), "asset_metadata.path") + filepath = self._get_path(path) + + with open(filepath, self.write_mode) as write_obj: + pickle.dump(obj, write_obj, PICKLE_PROTOCOL) + + return PickledObjectFileystemAssetAddress(asset_id=self._get_new_id(), filepath=filepath) + + def get_asset(self, _context, asset_metadata): + """ + load data object from file using AssetAddress + """ + check.dict_param(asset_metadata, "asset_metadata", key_type=str) + path = check.str_param(asset_metadata.get("path"), "asset_metadata.path") + filepath = self._get_path(path) + + with open(filepath, self.read_mode) as read_obj: + return pickle.load(read_obj) + + +@resource(config_schema={"base_dir": Field(StringSource, default_value=".", is_required=False)}) +def default_filesystem_asset_store(init_context): + return PickledObjectFileystemAssetStore(init_context.resource_config["base_dir"]) diff --git a/python_modules/dagster/dagster/core/storage/intermediate_storage.py b/python_modules/dagster/dagster/core/storage/intermediate_storage.py --- a/python_modules/dagster/dagster/core/storage/intermediate_storage.py +++ b/python_modules/dagster/dagster/core/storage/intermediate_storage.py @@ -53,6 +53,10 @@ if step_input.is_from_single_output: for source_handle in step_input.source_handles: + if context.get_asset_store_handle(source_handle) is not None: + # skip when the source output has asset store configured + continue + if ( source_handle in step_input.addresses and not self.has_intermediate_at_address( @@ -66,6 +70,10 @@ elif step_input.is_from_multiple_outputs: missing_source_handles = [] for source_handle in step_input.source_handles: + if context.get_asset_store_handle(source_handle) is not None: + # skip when the source output has asset store configured + continue + if ( source_handle in step_input.addresses and not self.has_intermediate_at_address( diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_asset_store.py @@ -0,0 +1,126 @@ +import os + +from dagster import ( + DagsterInstance, + ModeDefinition, + OutputDefinition, + execute_pipeline, + pipeline, + seven, + solid, +) +from dagster.core.definitions.events import AddressableAssetOperationType +from dagster.core.execution.api import create_execution_plan, execute_plan +from dagster.core.execution.plan.objects import StepOutputHandle +from dagster.core.storage.asset_store import default_filesystem_asset_store + + +def define_asset_pipeline(asset_store, asset_metadata_dict): + @solid( + output_defs=[ + OutputDefinition( + asset_store_key="default_fs_asset_store", + asset_metadata=asset_metadata_dict["solid_a"], + ) + ], + ) + def solid_a(_context): + return [1, 2, 3] + + @solid( + output_defs=[ + OutputDefinition( + asset_store_key="default_fs_asset_store", + asset_metadata=asset_metadata_dict["solid_b"], + ) + ], + ) + def solid_b(_context, _df): + return 1 + + @pipeline( + mode_defs=[ModeDefinition("local", resource_defs={"default_fs_asset_store": asset_store})] + ) + def asset_pipeline(): + solid_b(solid_a()) + + return asset_pipeline + + +def test_default_asset_store(): + with seven.TemporaryDirectory() as tmpdir_path: + default_asset_store = default_filesystem_asset_store + test_asset_metadata_dict = { + "solid_a": {"path": os.path.join(tmpdir_path, "a")}, + "solid_b": {"path": os.path.join(tmpdir_path, "b")}, + } + pipeline_def = define_asset_pipeline(default_asset_store, test_asset_metadata_dict) + + result = execute_pipeline(pipeline_def) + assert result.success + + addressable_asset_operation_events = list( + filter(lambda evt: evt.is_addressable_asset_operation, result.event_list) + ) + + assert len(addressable_asset_operation_events) == 3 + # SET ASSET for step "solid_a.compute" output "result" + assert ( + addressable_asset_operation_events[0].event_specific_data.op + == AddressableAssetOperationType.SET_ASSET + ) + assert ( + addressable_asset_operation_events[0].event_specific_data.address.filepath + == test_asset_metadata_dict["solid_a"]["path"] + ) + + # GET ASSET for step "solid_b.compute" input "_df" + assert ( + addressable_asset_operation_events[1].event_specific_data.op + == AddressableAssetOperationType.GET_ASSET + ) + assert ( + StepOutputHandle("solid_a.compute", "result") + == addressable_asset_operation_events[1].event_specific_data.step_output_handle + ) + + # SET ASSET for step "solid_b.compute" output "result" + assert ( + addressable_asset_operation_events[2].event_specific_data.op + == AddressableAssetOperationType.SET_ASSET + ) + assert ( + addressable_asset_operation_events[2].event_specific_data.address.filepath + == test_asset_metadata_dict["solid_b"]["path"] + ) + + +def execute_pipeline_with_steps(pipeline_def, step_keys_to_execute=None): + plan = create_execution_plan(pipeline_def, step_keys_to_execute=step_keys_to_execute) + with DagsterInstance.ephemeral() as instance: + pipeline_run = instance.create_run_for_pipeline( + pipeline_def=pipeline_def, step_keys_to_execute=step_keys_to_execute, + ) + return execute_plan(plan, instance, pipeline_run) + + +def test_step_subset(): + with seven.TemporaryDirectory() as tmpdir_path: + default_asset_store = default_filesystem_asset_store + test_asset_metadata_dict = { + "solid_a": {"path": os.path.join(tmpdir_path, "a")}, + "solid_b": {"path": os.path.join(tmpdir_path, "b")}, + } + + pipeline_def = define_asset_pipeline(default_asset_store, test_asset_metadata_dict) + events = execute_pipeline_with_steps(pipeline_def) + for evt in events: + assert not evt.is_failure + + step_subset_events = execute_pipeline_with_steps( + pipeline_def, step_keys_to_execute=["solid_b.compute"] + ) + for evt in step_subset_events: + assert not evt.is_failure + # only the selected step subset was executed + assert set([evt.step_key for evt in step_subset_events]) == {"solid_b.compute"}