diff --git a/docs/next/src/pages/overview/graphql-api/index.mdx b/docs/next/src/pages/overview/graphql-api/index.mdx --- a/docs/next/src/pages/overview/graphql-api/index.mdx +++ b/docs/next/src/pages/overview/graphql-api/index.mdx @@ -121,7 +121,7 @@ - run ID - pipeline name - tags -- status +- statuses For example, the following query will return all failed runs: diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -85,7 +85,7 @@ if run: runs = [run] elif filters and ( - filters.pipeline_name or filters.tags or filters.status or filters.snapshot_id + filters.pipeline_name or filters.tags or filters.statuses or filters.snapshot_id ): runs = instance.get_runs(filters, cursor, limit) else: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py --- a/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py @@ -77,7 +77,7 @@ runs_filter = PipelineRunsFilter( run_ids=filters.run_ids, pipeline_name=filters.pipeline_name, - status=filters.status, + statuses=filters.statuses, tags=merge_dicts(filters.tags, partition_tags), ) else: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots.py --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots.py @@ -838,9 +838,9 @@ def to_selector(self): if self.status: - status = PipelineRunStatus[self.status] + statuses = [PipelineRunStatus[self.status]] else: - status = None + statuses = [] if self.tags: # We are wrapping self.tags in a list because dauphin.List is not marked as iterable @@ -853,7 +853,7 @@ run_ids=run_ids, pipeline_name=self.pipeline_name, tags=tags, - status=status, + statuses=statuses, snapshot_id=self.snapshot_id, ) diff --git a/python_modules/dagster/dagster/cli/debug.py b/python_modules/dagster/dagster/cli/debug.py --- a/python_modules/dagster/dagster/cli/debug.py +++ b/python_modules/dagster/dagster/cli/debug.py @@ -8,7 +8,9 @@ def _recent_failed_runs_text(instance): lines = [] - runs = instance.get_runs(limit=5, filters=PipelineRunsFilter(status=PipelineRunStatus.FAILURE)) + runs = instance.get_runs( + limit=5, filters=PipelineRunsFilter(statuses=[PipelineRunStatus.FAILURE]) + ) if len(runs) <= 0: return "" for run in runs: diff --git a/python_modules/dagster/dagster/core/storage/pipeline_run.py b/python_modules/dagster/dagster/core/storage/pipeline_run.py --- a/python_modules/dagster/dagster/core/storage/pipeline_run.py +++ b/python_modules/dagster/dagster/core/storage/pipeline_run.py @@ -319,15 +319,14 @@ @whitelist_for_serdes class PipelineRunsFilter( - namedtuple("_PipelineRunsFilter", "run_ids pipeline_name status tags snapshot_id") + namedtuple("_PipelineRunsFilter", "run_ids pipeline_name statuses tags snapshot_id") ): - def __new__(cls, run_ids=None, pipeline_name=None, status=None, tags=None, snapshot_id=None): - run_ids = check.opt_list_param(run_ids, "run_ids", of_type=str) + def __new__(cls, run_ids=None, pipeline_name=None, statuses=None, tags=None, snapshot_id=None): return super(PipelineRunsFilter, cls).__new__( cls, - run_ids=run_ids, + run_ids=check.opt_list_param(run_ids, "run_ids", of_type=str), pipeline_name=check.opt_str_param(pipeline_name, "pipeline_name"), - status=status, + statuses=check.opt_list_param(statuses, "statuses", of_type=PipelineRunStatus), tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), snapshot_id=check.opt_str_param(snapshot_id, "snapshot_id"), ) diff --git a/python_modules/dagster/dagster/core/storage/runs/in_memory.py b/python_modules/dagster/dagster/core/storage/runs/in_memory.py --- a/python_modules/dagster/dagster/core/storage/runs/in_memory.py +++ b/python_modules/dagster/dagster/core/storage/runs/in_memory.py @@ -83,7 +83,7 @@ if filters.run_ids and run.run_id not in filters.run_ids: return False - if filters.status and filters.status != run.status: + if filters.statuses and run.status not in filters.statuses: return False if filters.pipeline_name and filters.pipeline_name != run.pipeline_name: diff --git a/python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py --- a/python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py @@ -158,8 +158,10 @@ if filters.pipeline_name: query = query.where(RunsTable.c.pipeline_name == filters.pipeline_name) - if filters.status: - query = query.where(RunsTable.c.status == filters.status.value) + if filters.statuses: + query = query.where( + RunsTable.c.status.in_([status.value for status in filters.statuses]) + ) if filters.tags: query = query.where( diff --git a/python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py b/python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py --- a/python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py +++ b/python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py @@ -66,18 +66,13 @@ self._instance.launch_run(run.run_id, external_pipeline) def _get_queued_runs(self, limit=None): - queued_runs_filter = PipelineRunsFilter(status=PipelineRunStatus.QUEUED) + queued_runs_filter = PipelineRunsFilter(statuses=[PipelineRunStatus.QUEUED]) runs = self._instance.get_runs(filters=queued_runs_filter, limit=limit) assert len(runs) <= limit return runs def _count_in_progress_runs(self): - num_runs = 0 - - # NOTE: this can be reduced to a single query if PipelineRunsFilters can take multiple statuses - for status in IN_PROGRESS_STATUSES: - runs_filter = PipelineRunsFilter(status=status) - num_runs += self._instance.get_runs_count(filters=runs_filter) - - return num_runs + return self._instance.get_runs_count( + filters=PipelineRunsFilter(statuses=IN_PROGRESS_STATUSES) + ) diff --git a/python_modules/dagster/dagster/utils/test/run_storage.py b/python_modules/dagster/dagster/utils/test/run_storage.py --- a/python_modules/dagster/dagster/utils/test/run_storage.py +++ b/python_modules/dagster/dagster/utils/test/run_storage.py @@ -267,8 +267,8 @@ assert some_runs[0].run_id == two assert some_runs[1].run_id == one - some_runs = storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) - count = storage.get_runs_count(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) + some_runs = storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) + count = storage.get_runs_count(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) assert len(some_runs) == 2 assert count == 2 assert some_runs[0].run_id == three @@ -302,14 +302,14 @@ PipelineRunsFilter( pipeline_name="some_pipeline", tags={"tag": "hello"}, - status=PipelineRunStatus.SUCCESS, + statuses=[PipelineRunStatus.SUCCESS], ) ) count = storage.get_runs_count( PipelineRunsFilter( pipeline_name="some_pipeline", tags={"tag": "hello"}, - status=PipelineRunStatus.SUCCESS, + statuses=[PipelineRunStatus.SUCCESS], ) ) assert len(some_runs) == 1 @@ -322,7 +322,7 @@ run_ids=[one], pipeline_name="some_pipeline", tags={"tag": "hello"}, - status=PipelineRunStatus.SUCCESS, + statuses=[PipelineRunStatus.SUCCESS], ) ) count = storage.get_runs_count( @@ -330,7 +330,7 @@ run_ids=[one], pipeline_name="some_pipeline", tags={"tag": "hello"}, - status=PipelineRunStatus.SUCCESS, + statuses=[PipelineRunStatus.SUCCESS], ) ) assert len(some_runs) == 1 @@ -482,22 +482,24 @@ assert { run.run_id - for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.NOT_STARTED)) + for run in storage.get_runs( + PipelineRunsFilter(statuses=[PipelineRunStatus.NOT_STARTED]) + ) } == {one} assert { run.run_id - for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.STARTED)) + for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED])) } == {two, three,} assert { run.run_id - for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.FAILURE)) + for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.FAILURE])) } == {four} assert { run.run_id - for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) + for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) } == set() def test_fetch_by_status_cursored(self, storage): @@ -528,24 +530,24 @@ ) cursor_four_runs = storage.get_runs( - PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=four + PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=four ) assert len(cursor_four_runs) == 2 assert {run.run_id for run in cursor_four_runs} == {one, two} cursor_two_runs = storage.get_runs( - PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=two + PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=two ) assert len(cursor_two_runs) == 1 assert {run.run_id for run in cursor_two_runs} == {one} cursor_one_runs = storage.get_runs( - PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=one + PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=one ) assert not cursor_one_runs cursor_four_limit_one = storage.get_runs( - PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=four, limit=1 + PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=four, limit=1 ) assert len(cursor_four_limit_one) == 1 assert cursor_four_limit_one[0].run_id == two @@ -824,7 +826,7 @@ storage.add_run(run) run_groups = storage.get_run_groups( - limit=5, filters=PipelineRunsFilter(status=PipelineRunStatus.FAILURE) + limit=5, filters=PipelineRunsFilter(statuses=[PipelineRunStatus.FAILURE]) ) assert len(run_groups) == 3