diff --git a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py --- a/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py @@ -230,6 +230,7 @@ SqlEventLogStorageTable.c.step_key, SqlEventLogStorageTable.c.dagster_event_type, db.func.max(SqlEventLogStorageTable.c.timestamp).label("timestamp"), + db.func.count(SqlEventLogStorageTable.c.id).label("count"), ] ) .where(SqlEventLogStorageTable.c.run_id == run_id) @@ -254,7 +255,15 @@ by_step_key[step_key]["start_time"] = ( datetime_as_float(result.timestamp) if result.timestamp else None ) - by_step_key[step_key]["attempts"] = 1 + by_step_key[step_key]["attempts"] = by_step_key[step_key].get("attempts", 0) + 1 + if result.dagster_event_type == DagsterEventType.STEP_RESTARTED.value: + by_step_key[step_key]["attempts"] = ( + # In case we see step retarted events but not a step started event, we want to + # only count the restarted events, since the attempt count represents + # the number of times we have successfully started runnning the step + by_step_key[step_key].get("attempts", 0) + + result.count + ) if result.dagster_event_type == DagsterEventType.STEP_FAILURE.value: by_step_key[step_key]["end_time"] = ( datetime_as_float(result.timestamp) if result.timestamp else None @@ -280,7 +289,6 @@ .where( SqlEventLogStorageTable.c.dagster_event_type.in_( [ - DagsterEventType.STEP_RESTARTED.value, DagsterEventType.STEP_MATERIALIZATION.value, DagsterEventType.STEP_EXPECTATION_RESULT.value, ] @@ -289,6 +297,11 @@ .order_by(SqlEventLogStorageTable.c.id.asc()) ) + if step_keys: + raw_event_query = raw_event_query.where( + SqlEventLogStorageTable.c.step_key.in_(step_keys) + ) + with self.connect(run_id) as conn: results = conn.execute(raw_event_query).fetchall() @@ -297,11 +310,7 @@ event = check.inst_param( deserialize_json_to_dagster_namedtuple(json_str), "event", EventRecord ) - if event.dagster_event.event_type == DagsterEventType.STEP_RESTARTED: - by_step_key[event.step_key]["attempts"] = ( - by_step_key[event.step_key].get("attempts") + 1 - ) - elif event.dagster_event.event_type == DagsterEventType.STEP_MATERIALIZATION: + if event.dagster_event.event_type == DagsterEventType.STEP_MATERIALIZATION: materializations[event.step_key].append( event.dagster_event.event_specific_data.materialization ) diff --git a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py --- a/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_instance.py @@ -7,6 +7,7 @@ DagsterEventType, DagsterInvalidConfigError, InputDefinition, + Output, OutputDefinition, PipelineRun, check, @@ -197,13 +198,19 @@ def test_run_step_stats_with_retries(): _called = None + _count = {"total": 0} @pipeline def simple(): @solid - def should_succeed(context): - context.log.info("succeed") - return "yay" + def should_succeed(_): + # This is to have at least one other step that retried to properly test + # the step key filter on `get_run_step_stats` + if _count["total"] < 2: + _count["total"] += 1 + raise RetryRequested(max_retries=3) + + yield Output("yay") @solid(input_defs=[InputDefinition("_input", str)], output_defs=[OutputDefinition(str)]) def should_retry(context, _input):