diff --git a/js_modules/dagit/src/schema.graphql b/js_modules/dagit/src/schema.graphql --- a/js_modules/dagit/src/schema.graphql +++ b/js_modules/dagit/src/schema.graphql @@ -629,7 +629,7 @@ union JobSpecificData = SensorJobData | ScheduleJobData type SensorJobData { - lastCompletedTimestamp: Float + lastTickTimestamp: Float lastRunKey: String } diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/jobs.py b/python_modules/dagster-graphql/dagster_graphql/schema/jobs.py --- a/python_modules/dagster-graphql/dagster_graphql/schema/jobs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/jobs.py @@ -116,13 +116,13 @@ class Meta: name = "SensorJobData" - lastCompletedTimestamp = dauphin.Float() + lastTickTimestamp = dauphin.Float() lastRunKey = dauphin.String() def __init__(self, _graphene_info, job_specific_data): check.inst_param(job_specific_data, "job_specific_data", SensorJobData) super(DauphinSensorJobData, self).__init__( - lastCompletedTimestamp=job_specific_data.last_completed_timestamp, + lastTickTimestamp=job_specific_data.last_tick_timestamp, lastRunKey=job_specific_data.last_run_key, ) diff --git a/python_modules/dagster/dagster/cli/sensor.py b/python_modules/dagster/dagster/cli/sensor.py --- a/python_modules/dagster/dagster/cli/sensor.py +++ b/python_modules/dagster/dagster/cli/sensor.py @@ -15,7 +15,7 @@ from dagster.core.host_representation import ExternalRepository from dagster.core.host_representation.external_data import ExternalSensorExecutionErrorData from dagster.core.instance import DagsterInstance -from dagster.core.scheduler.job import JobState, JobStatus +from dagster.core.scheduler.job import JobStatus def create_sensor_cli_group(): @@ -178,16 +178,6 @@ print_fn(job_title) -def _add_or_update_job_state(instance, external_sensor, status): - existing_job_state = instance.get_job_state(external_sensor.get_external_origin_id()) - if not existing_job_state: - instance.add_job_state( - JobState(external_sensor.get_external_origin(), JobType.SENSOR, status) - ) - else: - instance.update_job_state(existing_job_state.with_status(status)) - - @click.command(name="start", help="Start an existing sensor") @click.argument("sensor_name", nargs=-1) # , required=True) @click.option("--start-all", help="start all sensors", is_flag=True, default=False) @@ -212,7 +202,7 @@ if all_flag: try: for external_sensor in external_repo.get_external_sensors(): - _add_or_update_job_state(instance, external_sensor, JobStatus.RUNNING) + instance.start_sensor(external_sensor) print_fn( "Started all sensors for repository {repository_name}".format( repository_name=repository_name @@ -223,7 +213,7 @@ else: try: external_sensor = external_repo.get_external_sensor(sensor_name) - _add_or_update_job_state(instance, external_sensor, JobStatus.RUNNING) + instance.start_sensor(external_sensor) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) @@ -244,7 +234,7 @@ check_repo_and_scheduler(external_repo, instance) try: external_sensor = external_repo.get_external_sensor(sensor_name) - _add_or_update_job_state(instance, external_sensor, JobStatus.STOPPED) + instance.stop_sensor(external_sensor.get_external_origin_id()) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -1196,16 +1196,27 @@ # Schedule Storage def start_sensor(self, external_sensor): - from dagster.core.scheduler.job import JobState, JobStatus + from dagster.core.scheduler.job import JobState, JobStatus, SensorJobData from dagster.core.definitions.job import JobType job_state = self.get_job_state(external_sensor.get_external_origin_id()) + if not job_state: self.add_job_state( - JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) + JobState( + external_sensor.get_external_origin(), + JobType.SENSOR, + JobStatus.RUNNING, + SensorJobData(datetime.utcnow().timestamp()), + ) + ) + elif job_state.status != JobStatus.RUNNING: + # set the last completed time to the modified state time + self.update_job_state( + job_state.with_status(JobStatus.RUNNING).with_data( + SensorJobData(datetime.utcnow().timestamp()) + ) ) - else: - self.update_job_state(job_state.with_status(JobStatus.RUNNING)) def stop_sensor(self, job_origin_id): job_state = self.get_job_state(job_origin_id) diff --git a/python_modules/dagster/dagster/core/scheduler/job.py b/python_modules/dagster/dagster/core/scheduler/job.py --- a/python_modules/dagster/dagster/core/scheduler/job.py +++ b/python_modules/dagster/dagster/core/scheduler/job.py @@ -15,11 +15,11 @@ @whitelist_for_serdes -class SensorJobData(namedtuple("_SensorJobData", "last_completed_timestamp last_run_key")): - def __new__(cls, last_completed_timestamp=None, last_run_key=None): +class SensorJobData(namedtuple("_SensorJobData", "last_tick_timestamp last_run_key")): + def __new__(cls, last_tick_timestamp=None, last_run_key=None): return super(SensorJobData, cls).__new__( cls, - check.opt_float_param(last_completed_timestamp, "last_completed_timestamp"), + check.opt_float_param(last_tick_timestamp, "last_tick_timestamp"), check.opt_str_param(last_run_key, "last_run_key"), ) diff --git a/python_modules/dagster/dagster/scheduler/sensor.py b/python_modules/dagster/dagster/scheduler/sensor.py --- a/python_modules/dagster/dagster/scheduler/sensor.py +++ b/python_modules/dagster/dagster/scheduler/sensor.py @@ -182,9 +182,7 @@ instance, external_repo.handle, external_sensor.name, - job_state.job_specific_data.last_completed_timestamp - if job_state.job_specific_data - else None, + job_state.job_specific_data.last_tick_timestamp if job_state.job_specific_data else None, job_state.job_specific_data.last_run_key if job_state.job_specific_data else None, ) if isinstance(sensor_runtime_data, ExternalSensorExecutionErrorData):