diff --git a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py --- a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py @@ -221,7 +221,6 @@ DagsterEventType.STEP_SUCCESS.value, DagsterEventType.STEP_SKIPPED.value, DagsterEventType.STEP_FAILURE.value, - DagsterEventType.STEP_RESTARTED.value, ] by_step_query = ( @@ -289,6 +288,11 @@ .order_by(SqlEventLogStorageTable.c.id.asc()) ) + if step_keys: + raw_event_query = raw_event_query.where( + SqlEventLogStorageTable.c.step_key.in_(step_keys) + ) + with self.connect(run_id) as conn: results = conn.execute(raw_event_query).fetchall() @@ -299,7 +303,7 @@ ) if event.dagster_event.event_type == DagsterEventType.STEP_RESTARTED: by_step_key[event.step_key]["attempts"] = ( - by_step_key[event.step_key].get("attempts") + 1 + by_step_key[event.step_key].get("attempts", 0) + 1 ) elif event.dagster_event.event_type == DagsterEventType.STEP_MATERIALIZATION: materializations[event.step_key].append( diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py --- a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py @@ -7,6 +7,7 @@ DagsterEventType, DagsterInvalidConfigError, InputDefinition, + Output, OutputDefinition, PipelineRun, check, @@ -197,13 +198,18 @@ def test_run_step_stats_with_retries(): _called = None + _count = 0 @pipeline def simple(): @solid - def should_succeed(context): - context.log.info("succeed") - return "yay" + def should_succeed(_): + # This is to have at least one other step that retried to properly test + # the step key filter on `get_run_step_stats` + if _count < 2: + raise RetryRequested(max_retries=3) + + yield Output("yay") @solid(input_defs=[InputDefinition("_input", str)], output_defs=[OutputDefinition(str)]) def should_retry(context, _input):