diff --git a/python_modules/dagster/dagster/daemon/controller.py b/python_modules/dagster/dagster/daemon/controller.py --- a/python_modules/dagster/dagster/daemon/controller.py +++ b/python_modules/dagster/dagster/daemon/controller.py @@ -81,33 +81,54 @@ return list(self._daemons.values()) def run_iteration(self, curr_time): + first_controller_iteration = not self._last_heartbeat_times + daemon_generators = [] # list of daemon generator functions for daemon in self.daemons: if (not daemon.last_iteration_time) or ( (curr_time - daemon.last_iteration_time).total_seconds() >= daemon.interval_seconds ): daemon.last_iteration_time = curr_time - daemon.last_iteration_exception = None daemon_generators.append((daemon, daemon.run_iteration())) + # Build a list of any exceptions encountered during the iteration. + # Once the iteration completes, this is copied to last_iteration_exceptions + # which is used in the heartbeats. This guarantees that heartbeats contain the full + # list of errors raised. + daemon.current_iteration_exceptions = [] + # Call next on each daemon generator function, rotating through the daemons. while len(daemon_generators) > 0: daemon, generator = daemon_generators.pop(0) try: - next(generator) + error_info = next(generator) + if error_info: + daemon.current_iteration_exceptions.append(error_info) except StopIteration: - pass # don't add the generator back + # daemon has completed an iteration, don't add the generator back + # We've completed an iteration, so errors can be reported in heartbeat + daemon.last_iteration_exceptions = daemon.current_iteration_exceptions except Exception: # pylint: disable=broad-except # log errors in daemon error_info = serializable_error_info_from_exc_info(sys.exc_info()) - daemon.last_iteration_exception = error_info self._logger.error( - "Caught error in {}:\n{}".format(daemon.daemon_type(), error_info) + "Caught error in {}:\n{}".format(daemon.daemon_type(), error_info,) ) + daemon.current_iteration_exceptions.append(error_info) + # The iteration stopped short, so errors can be reported in heartbeat + daemon.last_iteration_exceptions = daemon.current_iteration_exceptions else: # append to the back, so other daemons will execute next daemon_generators.append((daemon, generator)) - self._check_add_heartbeat(daemon, curr_time) + + if not first_controller_iteration: + # wait until first iteration completes, otherwise heartbeats may be reported before + # errors occur + self._check_add_heartbeat(daemon, curr_time) + + if first_controller_iteration: + for daemon in self.daemons: + self._check_add_heartbeat(daemon, curr_time) def _check_add_heartbeat(self, daemon, curr_time): """ @@ -144,7 +165,7 @@ pendulum.now("UTC").float_timestamp, daemon.daemon_type(), self._daemon_uuid, - daemon.last_iteration_exception, + daemon.last_iteration_exceptions, ) ) @@ -203,7 +224,7 @@ has_recent_heartbeat = curr_time_seconds <= maximum_tolerated_time # check if daemon has an error - healthy = has_recent_heartbeat and not latest_heartbeat.error + healthy = has_recent_heartbeat and not latest_heartbeat.errors return DaemonStatus( daemon_type=daemon_type, diff --git a/python_modules/dagster/dagster/daemon/daemon.py b/python_modules/dagster/dagster/daemon/daemon.py --- a/python_modules/dagster/dagster/daemon/daemon.py +++ b/python_modules/dagster/dagster/daemon/daemon.py @@ -35,7 +35,8 @@ self._logger = get_default_daemon_logger(type(self).__name__) self.interval_seconds = check.int_param(interval_seconds, "interval_seconds") self.last_iteration_time = None - self.last_iteration_exception = None + self.last_iteration_exceptions = [] + self.current_iteration_exceptions = [] @abstractclassmethod def daemon_type(cls): @@ -47,9 +48,10 @@ def run_iteration(self): """ Execute the daemon. In order to avoid blocking the controller thread for extended periods, - daemons can yield control during this method. + daemons can yield control during this method. Yeilds can be either NoneType or a + non-fatal exception - returns: generator (NoneType) + returns: generator (Exception). """ diff --git a/python_modules/dagster/dagster/daemon/types.py b/python_modules/dagster/dagster/daemon/types.py --- a/python_modules/dagster/dagster/daemon/types.py +++ b/python_modules/dagster/dagster/daemon/types.py @@ -15,18 +15,18 @@ @whitelist_for_serdes -class DaemonHeartbeat(namedtuple("_DaemonHeartbeat", "timestamp daemon_type daemon_id error")): +class DaemonHeartbeat(namedtuple("_DaemonHeartbeat", "timestamp daemon_type daemon_id errors")): """ Heartbeats are placed in storage by the daemon to show liveness """ - def __new__(cls, timestamp, daemon_type, daemon_id, error): + def __new__(cls, timestamp, daemon_type, daemon_id, errors): return super(DaemonHeartbeat, cls).__new__( cls, timestamp=check.float_param(timestamp, "timestamp"), daemon_type=check.inst_param(daemon_type, "daemon_type", DaemonType), daemon_id=daemon_id, - error=check.opt_inst_param(error, "error", SerializableErrorInfo), + errors=check.list_param(errors, "errors", of_type=SerializableErrorInfo), ) diff --git a/python_modules/dagster/dagster/scheduler/scheduler.py b/python_modules/dagster/dagster/scheduler/scheduler.py --- a/python_modules/dagster/dagster/scheduler/scheduler.py +++ b/python_modules/dagster/dagster/scheduler/scheduler.py @@ -82,6 +82,7 @@ logger.info(f"Checking for new runs for the following schedules: {schedule_names}") for schedule_state in schedules: + error_info = None try: with RepositoryLocationHandle.create_from_repository_location_origin( schedule_state.origin.external_repository_origin.repository_location_origin @@ -98,9 +99,11 @@ (debug_crash_flags.get(schedule_state.job_name) if debug_crash_flags else None), ) except Exception: # pylint: disable=broad-except - error_info = serializable_error_info_from_exc_info(sys.exc_info()).to_string() - logger.error(f"Scheduler failed for {schedule_state.job_name} : {error_info}") - yield + error_info = serializable_error_info_from_exc_info(sys.exc_info()) + logger.error( + f"Scheduler failed for {schedule_state.job_name} : {error_info.to_string()}" + ) + yield error_info def launch_scheduled_runs_for_schedule( diff --git a/python_modules/dagster/dagster/scheduler/sensor.py b/python_modules/dagster/dagster/scheduler/sensor.py --- a/python_modules/dagster/dagster/scheduler/sensor.py +++ b/python_modules/dagster/dagster/scheduler/sensor.py @@ -122,6 +122,7 @@ sensor_debug_crash_flags = ( debug_crash_flags.get(job_state.job_name) if debug_crash_flags else None ) + error_info = None try: with RepositoryLocationHandle.create_from_repository_location_origin( job_state.origin.external_repository_origin.repository_location_origin @@ -168,13 +169,13 @@ before=now.subtract(days=7), # keep the last 7 days ) except Exception: # pylint: disable=broad-except + error_info = serializable_error_info_from_exc_info(sys.exc_info()) logger.error( "Sensor failed for {sensor_name} : {error_info}".format( - sensor_name=job_state.job_name, - error_info=serializable_error_info_from_exc_info(sys.exc_info()).to_string(), + sensor_name=job_state.job_name, error_info=error_info.to_string(), ) ) - yield + yield error_info def _evaluate_sensor( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon_health.py b/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon_health.py --- a/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon_health.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon_health.py @@ -7,6 +7,7 @@ all_daemons_healthy, get_daemon_status, ) +from dagster.utils.error import SerializableErrorInfo def test_healthy(): @@ -63,12 +64,34 @@ status = get_daemon_status(instance, SensorDaemon.daemon_type(), init_time.float_timestamp) assert status.healthy == False + assert len(status.last_heartbeat.errors) == 1 assert ( - status.last_heartbeat.error.message.strip() + status.last_heartbeat.errors[0].message.strip() == "dagster.core.errors.DagsterInvariantViolationError: foobar" ) +def test_multiple_error_daemon(monkeypatch): + with instance_for_test(overrides={}) as instance: + from dagster.daemon.daemon import SensorDaemon + + def run_iteration_error(_): + # ?message stack cls_name cause" + yield SerializableErrorInfo("foobar", None, None, None) + yield SerializableErrorInfo("bizbuz", None, None, None) + + monkeypatch.setattr(SensorDaemon, "run_iteration", run_iteration_error) + controller = DagsterDaemonController(instance) + init_time = pendulum.now("UTC") + controller.run_iteration(init_time) + + status = get_daemon_status(instance, SensorDaemon.daemon_type(), init_time.float_timestamp) + assert status.healthy == False + assert len(status.last_heartbeat.errors) == 2 + assert status.last_heartbeat.errors[0].message.strip() == "foobar" + assert status.last_heartbeat.errors[1].message.strip() == "bizbuz" + + def test_warn_multiple_daemons(capsys): with instance_for_test() as instance: init_time = pendulum.now("UTC")