diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -44,7 +44,7 @@ events = instance.events_for_asset_key(asset_key, partitions, cursor, limit) return [ event - for event in events + for record_id, event in events if event.is_dagster_event and event.dagster_event.event_type_value == DagsterEventType.STEP_MATERIALIZATION.value ] diff --git a/python_modules/dagster-test/dagster_test/toys/log_asset.py b/python_modules/dagster-test/dagster_test/toys/log_asset.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster-test/dagster_test/toys/log_asset.py @@ -0,0 +1,14 @@ +from dagster import Array, Output, pipeline, solid + + +@solid(config_schema={"asset_key": Array(str), "pipeline": str}) +def read_materialization(context): + asset_key = context.solid_config["asset_key"] + from_pipeline = context.solid_config["pipeline"] + context.log.info(f"Found materialization for asset key {asset_key} in {from_pipeline}") + yield Output(asset_key) + + +@pipeline(description="Demo pipeline that logs asset materializations from other pipelines") +def log_asset_pipeline(): + read_materialization() diff --git a/python_modules/dagster-test/dagster_test/toys/repo.py b/python_modules/dagster-test/dagster_test/toys/repo.py --- a/python_modules/dagster-test/dagster_test/toys/repo.py +++ b/python_modules/dagster-test/dagster_test/toys/repo.py @@ -3,6 +3,7 @@ from dagster_test.toys.composition import composition from dagster_test.toys.error_monster import error_monster from dagster_test.toys.hammer import hammer_pipeline +from dagster_test.toys.log_asset import log_asset_pipeline from dagster_test.toys.log_file import log_file_pipeline from dagster_test.toys.log_spew import log_spew from dagster_test.toys.longitudinal import longitudinal_pipeline @@ -22,6 +23,7 @@ composition, error_monster, hammer_pipeline, + log_asset_pipeline, log_file_pipeline, log_spew, longitudinal_pipeline, diff --git a/python_modules/dagster-test/dagster_test/toys/sensors.py b/python_modules/dagster-test/dagster_test/toys/sensors.py --- a/python_modules/dagster-test/dagster_test/toys/sensors.py +++ b/python_modules/dagster-test/dagster_test/toys/sensors.py @@ -1,6 +1,6 @@ import os -from dagster import RunRequest, SkipReason, check, sensor +from dagster import AssetKey, RunRequest, SkipReason, check, sensor def get_directory_files(directory_name): @@ -50,4 +50,27 @@ }, ) - return [toy_file_sensor] + @sensor(pipeline_name="log_asset_pipeline") + def toy_asset_sensor(context): + events = context.instance.events_for_asset_key( + AssetKey(["model"]), cursor=context.last_run_key, ascending=True + ) + + if not events: + return + + record_id, event = events[-1] # take the most recent materialization + from_pipeline = event.pipeline_name + + yield RunRequest( + run_key=str(record_id), + run_config={ + "solids": { + "read_materialization": { + "config": {"asset_key": ["model"], "pipeline": from_pipeline} + } + } + }, + ) + + return [toy_file_sensor, toy_asset_sensor] diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py --- a/python_modules/dagster/dagster/core/definitions/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -1,4 +1,7 @@ +import inspect + from dagster import check +from dagster.core.errors import DagsterInvariantViolationError from dagster.core.instance import DagsterInstance from dagster.utils import ensure_gen @@ -80,3 +83,19 @@ return check.is_list(result, of_type=(RunRequest, SkipReason)) return check.is_list(result, of_type=RunRequest) + + +def wrap_sensor_evaluation(sensor_name, result): + if inspect.isgenerator(result): + for item in result: + yield item + + elif isinstance(result, (SkipReason, RunRequest)): + yield result + + elif result is not None: + raise DagsterInvariantViolationError( + f"Error in sensor {sensor_name}: Sensor unexpectedly returned output " + f"{result} of type {type(result)}. Should only return SkipReason or " + "RunRequest objects." + ) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -933,10 +933,14 @@ self.check_asset_aware() return self._event_storage.has_asset_key(asset_key) - def events_for_asset_key(self, asset_key, partitions=None, cursor=None, limit=None): + def events_for_asset_key( + self, asset_key, partitions=None, cursor=None, limit=None, ascending=False + ): check.inst_param(asset_key, "asset_key", AssetKey) self.check_asset_aware() - return self._event_storage.get_asset_events(asset_key, partitions, cursor, limit) + return self._event_storage.get_asset_events( + asset_key, partitions, cursor, limit, ascending=ascending, include_cursor=True + ) def run_ids_for_asset_key(self, asset_key): check.inst_param(asset_key, "asset_key", AssetKey) diff --git a/python_modules/dagster/dagster/core/storage/event_log/base.py b/python_modules/dagster/dagster/core/storage/event_log/base.py --- a/python_modules/dagster/dagster/core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/core/storage/event_log/base.py @@ -118,7 +118,15 @@ pass @abstractmethod - def get_asset_events(self, asset_key, partitions=None, cursor=None, limit=None): + def get_asset_events( + self, + asset_key, + partitions=None, + cursor=None, + limit=None, + ascending=False, + include_cursor=False, + ): pass @abstractmethod diff --git a/python_modules/dagster/dagster/core/storage/event_log/in_memory.py b/python_modules/dagster/dagster/core/storage/event_log/in_memory.py --- a/python_modules/dagster/dagster/core/storage/event_log/in_memory.py +++ b/python_modules/dagster/dagster/core/storage/event_log/in_memory.py @@ -112,7 +112,15 @@ asset_keys["/".join(event.asset_key.path)] = event.asset_key return list(asset_keys.values()) - def get_asset_events(self, asset_key, partitions=None, cursor=None, limit=None): + def get_asset_events( + self, + asset_key, + partitions=None, + cursor=None, + limit=None, + ascending=False, + include_cursor=False, + ): asset_events = [] for records in self._logs.values(): asset_events += [ @@ -124,7 +132,12 @@ if partitions: asset_events = filter(lambda x: x.dagster_event.partition in partitions, asset_events) - asset_events = sorted(asset_events, key=lambda x: x.timestamp, reverse=True) + asset_events = sorted(asset_events, key=lambda x: x.timestamp, reverse=not ascending) + + try: + cursor = int(cursor) if cursor else 0 + except ValueError: + cursor = 0 if cursor: asset_events = asset_events[cursor:] @@ -132,6 +145,11 @@ if limit: asset_events = asset_events[:limit] + if include_cursor: + asset_events = [ + tuple([index + cursor, event]) for index, event in enumerate(asset_events) + ] + return asset_events def get_asset_run_ids(self, asset_key): diff --git a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py --- a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py @@ -470,19 +470,30 @@ def upgrade(self): pass - def _add_cursor_limit_to_query(self, query, cursor, limit): + def _add_cursor_limit_to_query(self, query, cursor, limit, ascending=False): """ Helper function to deal with cursor/limit pagination args """ + try: + cursor = int(cursor) if cursor else None + except ValueError: + cursor = None if cursor: cursor_query = db.select([SqlEventLogStorageTable.c.id]).where( SqlEventLogStorageTable.c.id == cursor ) - query = query.where(SqlEventLogStorageTable.c.id < cursor_query) + if ascending: + query = query.where(SqlEventLogStorageTable.c.id > cursor_query) + else: + query = query.where(SqlEventLogStorageTable.c.id < cursor_query) if limit: query = query.limit(limit) - query = query.order_by(SqlEventLogStorageTable.c.timestamp.desc()) + if ascending: + query = query.order_by(SqlEventLogStorageTable.c.timestamp.asc()) + else: + query = query.order_by(SqlEventLogStorageTable.c.timestamp.desc()) + return query def has_asset_key(self, asset_key): @@ -556,7 +567,15 @@ set([AssetKey.from_db_string(asset_key) for (asset_key,) in results if asset_key]) ) - def get_asset_events(self, asset_key, partitions=None, cursor=None, limit=None): + def get_asset_events( + self, + asset_key, + partitions=None, + cursor=None, + limit=None, + ascending=False, + include_cursor=False, + ): check.inst_param(asset_key, "asset_key", AssetKey) check.opt_list_param(partitions, "partitions", of_type=str) query = db.select([SqlEventLogStorageTable.c.id, SqlEventLogStorageTable.c.event]).where( @@ -568,7 +587,7 @@ if partitions: query = query.where(SqlEventLogStorageTable.c.partition.in_(partitions)) - query = self._add_cursor_limit_to_query(query, cursor, limit) + query = self._add_cursor_limit_to_query(query, cursor, limit, ascending=ascending) with self.connect() as conn: results = conn.execute(query).fetchall() @@ -583,7 +602,10 @@ ) ) continue - events.append(event_record) + if include_cursor: + events.append(tuple([row_id, event_record])) + else: + events.append(event_record) except seven.JSONDecodeError: logging.warning("Could not parse asset event record id `{}`.".format(row_id)) return events