diff --git a/python_modules/dagster/dagster/cli/sensor.py b/python_modules/dagster/dagster/cli/sensor.py --- a/python_modules/dagster/dagster/cli/sensor.py +++ b/python_modules/dagster/dagster/cli/sensor.py @@ -286,7 +286,7 @@ error_info=sensor_runtime_data.error.to_string(), ) ) - elif not sensor_runtime_data.run_params: + elif not sensor_runtime_data.run_requests: if sensor_runtime_data.skip_message: print_fn( "Sensor returned false for {sensor_name}, skipping: {skip_message}".format( @@ -302,11 +302,11 @@ ) else: print_fn( - "Sensor returning run parameters for {num} run(s):\n\n{run_params}".format( - num=len(sensor_runtime_data.run_params), - run_params="\n".join( - yaml.safe_dump(param.run_config, default_flow_style=False) - for param in sensor_runtime_data.run_params + "Sensor returning run requests for {num} run(s):\n\n{run_requests}".format( + num=len(sensor_runtime_data.run_requests), + run_requests="\n".join( + yaml.safe_dump(run_request.run_config, default_flow_style=False) + for run_request in sensor_runtime_data.run_requests ), ) ) diff --git a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py --- a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py @@ -1,7 +1,7 @@ import inspect from dagster import check -from dagster.core.definitions.sensor import SensorDefinition, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SensorDefinition, SkipReason from dagster.core.errors import DagsterInvariantViolationError from dagster.utils.backcompat import experimental @@ -12,10 +12,9 @@ Creates a sensor where the decorated function is used as the sensor's evaluation function. The decorated function may: - 1. Return a `SensorRunParams` object. - 2. Yield a number of `SensorRunParams` objects. - 3. Return a `SensorSkipData` object, providing a descriptive message of why no runs were - requested. + 1. Return a `RunRequest` object. + 2. Yield multiple of `RunRequest` objects. + 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Yield nothing (skipping without providing a reason) Takes a :py:class:`~dagster.SensorExecutionContext`. @@ -40,15 +39,15 @@ if inspect.isgenerator(result): for item in result: yield item - elif isinstance(result, (SensorSkipData, SensorRunParams)): + elif isinstance(result, (SkipReason, RunRequest)): yield result elif result is not None: raise DagsterInvariantViolationError( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " - "{result} of type {type_}. Should only return SensorSkipData or " - "SensorRunParams objects." + "{result} of type {type_}. Should only return SkipReason or " + "RunRequest objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) diff --git a/python_modules/dagster/dagster/core/definitions/job.py b/python_modules/dagster/dagster/core/definitions/job.py --- a/python_modules/dagster/dagster/core/definitions/job.py +++ b/python_modules/dagster/dagster/core/definitions/job.py @@ -1,7 +1,6 @@ from enum import Enum from dagster import check -from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from .mode import DEFAULT_MODE_NAME @@ -27,6 +26,8 @@ __slots__ = ["_instance"] def __init__(self, instance): + from dagster.core.instance import DagsterInstance + self._instance = check.inst_param(instance, "instance", DagsterInstance) @property diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py --- a/python_modules/dagster/dagster/core/definitions/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -8,41 +8,6 @@ from dagster.utils.backcompat import experimental_class_warning -@whitelist_for_serdes -class SensorSkipData(namedtuple("_SensorSkipData", "skip_message")): - def __new__(cls, skip_message=None): - return super(SensorSkipData, cls).__new__( - cls, skip_message=check.opt_str_param(skip_message, "skip_message") - ) - - -@whitelist_for_serdes -class SensorRunParams(namedtuple("_SensorRunParams", "execution_key run_config tags")): - """ - Represents all the information required to launch a single run instigated by a sensor body. - Must be returned by a SensorDefinition's evaluation function for a run to be launched. - - Attributes: - execution_key (str | None): A string key to identify this launched run. The sensor will - ensure that exactly one run is created for each execution key, and will not create - another run if the same execution key is requested in a later evaluation. Passing in - a `None` value means that the sensor will attempt to create and launch every run - requested for every sensor evaluation. - run_config (Optional[Dict]): The environment config that parameterizes the run execution to - be launched, as a dict. - tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach - to the launched run. - """ - - def __new__(cls, execution_key, run_config=None, tags=None): - return super(SensorRunParams, cls).__new__( - cls, - run_config=check.opt_dict_param(run_config, "run_config"), - tags=check.opt_dict_param(tags, "tags"), - execution_key=check.opt_str_param(execution_key, "execution_key"), - ) - - class SensorExecutionContext(JobContext): """Sensor execution context. @@ -69,6 +34,41 @@ return self._last_completion_time +@whitelist_for_serdes +class SkipReason(namedtuple("_SkipReason", "skip_message")): + def __new__(cls, skip_message=None): + return super(SkipReason, cls).__new__( + cls, skip_message=check.opt_str_param(skip_message, "skip_message") + ) + + +@whitelist_for_serdes +class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")): + """ + Represents all the information required to launch a single run instigated by a sensor body. + Must be returned by a SensorDefinition's evaluation function for a run to be launched. + + Attributes: + run_key (str | None): A string key to identify this launched run. The sensor will + ensure that exactly one run is created for each run key, and will not create + another run if the same run key is requested in a later evaluation. Passing in + a `None` value means that the sensor will attempt to create and launch every run + requested for every sensor evaluation. + run_config (Optional[Dict]): The environment config that parameterizes the run execution to + be launched, as a dict. + tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach + to the launched run. + """ + + def __new__(cls, run_key, run_config=None, tags=None): + return super(RunRequest, cls).__new__( + cls, + run_key=check.opt_str_param(run_key, "run_key"), + run_config=check.opt_dict_param(run_config, "run_config"), + tags=check.opt_dict_param(tags, "tags"), + ) + + class SensorDefinition(JobDefinition): """Define a sensor that initiates a set of job runs @@ -79,8 +79,8 @@ sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorExecutionContext`. - This function must return a generator, which must yield either a single SensorSkipData - or one or more SensorRunParams objects. + This function must return a generator, which must yield either a single SkipReason + or one or more RunRequest objects. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this sensor. (default: 'default') @@ -111,6 +111,6 @@ return [] if len(result) == 1: - return check.is_list(result, of_type=(SensorRunParams, SensorSkipData)) + return check.is_list(result, of_type=(RunRequest, SkipReason)) - return check.is_list(result, of_type=SensorRunParams) + return check.is_list(result, of_type=RunRequest) diff --git a/python_modules/dagster/dagster/core/host_representation/external_data.py b/python_modules/dagster/dagster/core/host_representation/external_data.py --- a/python_modules/dagster/dagster/core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/core/host_representation/external_data.py @@ -18,7 +18,7 @@ ScheduleDefinition, ) from dagster.core.definitions.partition import PartitionScheduleDefinition -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest from dagster.core.snap import PipelineSnapshot from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @@ -235,16 +235,16 @@ @whitelist_for_serdes class ExternalSensorExecutionData( - namedtuple("_ExternalSensorExecutionData", "run_params skip_message") + namedtuple("_ExternalSensorExecutionData", "run_requests skip_message") ): - def __new__(cls, run_params=None, skip_message=None): - check.opt_list_param(run_params, "run_params", SensorRunParams) + def __new__(cls, run_requests=None, skip_message=None): + check.opt_list_param(run_requests, "run_requests", RunRequest) check.opt_str_param(skip_message, "skip_message") check.invariant( - not (run_params and skip_message), "Found both skip data and run request data" + not (run_requests and skip_message), "Found both skip data and run request data" ) return super(ExternalSensorExecutionData, cls).__new__( - cls, run_params=run_params, skip_message=skip_message, + cls, run_requests=run_requests, skip_message=skip_message, ) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -1251,8 +1251,8 @@ def get_latest_job_tick(self, job_origin_id): return self._schedule_storage.get_latest_job_tick(job_origin_id) - def has_job_tick(self, job_origin_id, execution_key, statuses=None): - return self._schedule_storage.has_job_tick(job_origin_id, execution_key, statuses) + def has_job_tick(self, job_origin_id, run_key, statuses=None): + return self._schedule_storage.has_job_tick(job_origin_id, run_key, statuses) def create_job_tick(self, job_tick_data): return self._schedule_storage.create_job_tick(job_tick_data) diff --git a/python_modules/dagster/dagster/core/scheduler/job.py b/python_modules/dagster/dagster/core/scheduler/job.py --- a/python_modules/dagster/dagster/core/scheduler/job.py +++ b/python_modules/dagster/dagster/core/scheduler/job.py @@ -134,8 +134,8 @@ return self.job_tick_data.timestamp @property - def execution_key(self): - return self.job_tick_data.execution_key + def run_key(self): + return self.job_tick_data.run_key @property def status(self): @@ -153,8 +153,7 @@ @whitelist_for_serdes class JobTickData( namedtuple( - "_JobTickData", - "job_origin_id job_name job_type status timestamp run_id error execution_key", + "_JobTickData", "job_origin_id job_name job_type status timestamp run_id error run_key", ) ): def __new__( @@ -166,7 +165,7 @@ timestamp, run_id=None, error=None, - execution_key=None, + run_key=None, ): """ This class defines the data that is serialized and stored in ``JobStorage``. We depend @@ -185,7 +184,7 @@ run_id (str): The run created by the tick. error (SerializableErrorInfo): The error caught during job execution. This is set only when the status is ``JobTickStatus.Failure`` - execution_key (Optional[str]): A string that uniquely identifies a pipeline execution + run_key (Optional[str]): A string that uniquely identifies a pipeline execution """ _validate_job_tick_args(job_type, status, run_id, error) @@ -198,10 +197,10 @@ check.float_param(timestamp, "timestamp"), run_id, # validated in _validate_job_tick_args error, # validated in _validate_job_tick_args - check.opt_str_param(execution_key, "execution_key"), + check.opt_str_param(run_key, "run_key"), ) - def with_status(self, status, run_id=None, error=None, timestamp=None, execution_key=None): + def with_status(self, status, run_id=None, error=None, timestamp=None, run_key=None): check.inst_param(status, "status", JobTickStatus) return JobTickData( job_origin_id=self.job_origin_id, @@ -211,7 +210,7 @@ timestamp=timestamp if timestamp is not None else self.timestamp, run_id=run_id if run_id is not None else self.run_id, error=error if error is not None else self.error, - execution_key=execution_key if execution_key is not None else self.execution_key, + run_key=run_key if run_key is not None else self.run_key, ) diff --git a/python_modules/dagster/dagster/core/storage/schedules/base.py b/python_modules/dagster/dagster/core/storage/schedules/base.py --- a/python_modules/dagster/dagster/core/storage/schedules/base.py +++ b/python_modules/dagster/dagster/core/storage/schedules/base.py @@ -62,12 +62,12 @@ """ @abc.abstractmethod - def has_job_tick(self, job_origin_id, execution_key, statuses=None): - """Checks if there is a job tick for a given job / execution_key in storage. + def has_job_tick(self, job_origin_id, run_key, statuses=None): + """Checks if there is a job tick for a given job / run_key in storage. Args: job_origin_id (str): The id of the ExternalJob target - execution_key (str): User-provided key that identifies a given job execution + run_key (str): User-provided key that identifies a given job execution statuses (Optional[List[JobTickStatus]]): List of statuses to filter ticks by """ diff --git a/python_modules/dagster/dagster/core/storage/schedules/schema.py b/python_modules/dagster/dagster/core/storage/schedules/schema.py --- a/python_modules/dagster/dagster/core/storage/schedules/schema.py +++ b/python_modules/dagster/dagster/core/storage/schedules/schema.py @@ -50,13 +50,13 @@ db.Column("job_origin_id", db.String(255), index=True), db.Column("status", db.String(63)), db.Column("type", db.String(63)), - db.Column("execution_key", db.String), + db.Column("run_key", db.String), db.Column("timestamp", db.types.TIMESTAMP), db.Column("tick_body", db.String), db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), db.Column("update_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), ) -db.Index("idx_job_tick_execution_key", JobTickTable.c.job_origin_id, JobTickTable.c.execution_key) +db.Index("idx_job_tick_run_key", JobTickTable.c.job_origin_id, JobTickTable.c.run_key) db.Index("idx_job_tick_status", JobTickTable.c.job_origin_id, JobTickTable.c.status) db.Index("idx_job_tick_timestamp", JobTickTable.c.job_origin_id, JobTickTable.c.timestamp) diff --git a/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py b/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py --- a/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py +++ b/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py @@ -152,16 +152,16 @@ map(lambda r: JobTick(r[0], deserialize_json_to_dagster_namedtuple(r[1])), rows) ) - def has_job_tick(self, job_origin_id, execution_key, statuses=None): + def has_job_tick(self, job_origin_id, run_key, statuses=None): check.str_param(job_origin_id, "job_origin_id") - check.str_param(execution_key, "execution_key") + check.str_param(run_key, "run_key") check.opt_list_param(statuses, "statuses", of_type=JobTickStatus) query = ( db.select([1]) .select_from(JobTickTable) .where(JobTickTable.c.job_origin_id == job_origin_id) - .where(JobTickTable.c.execution_key == execution_key) + .where(JobTickTable.c.run_key == run_key) ) if statuses: query = query.where(JobTickTable.c.status.in_([status.value for status in statuses])) @@ -179,7 +179,7 @@ job_origin_id=job_tick_data.job_origin_id, status=job_tick_data.status.value, type=job_tick_data.job_type.value, - execution_key=job_tick_data.execution_key, + run_key=job_tick_data.run_key, timestamp=utc_datetime_from_timestamp(job_tick_data.timestamp), tick_body=serialize_dagster_namedtuple(job_tick_data), ) @@ -206,7 +206,7 @@ .values( status=tick.status.value, type=tick.job_type.value, - execution_key=tick.execution_key, + run_key=tick.run_key, timestamp=utc_datetime_from_timestamp(tick.timestamp), tick_body=serialize_dagster_namedtuple(tick.job_tick_data), ) diff --git a/python_modules/dagster/dagster/core/storage/tags.py b/python_modules/dagster/dagster/core/storage/tags.py --- a/python_modules/dagster/dagster/core/storage/tags.py +++ b/python_modules/dagster/dagster/core/storage/tags.py @@ -33,7 +33,7 @@ SCHEDULED_EXECUTION_TIME_TAG = "{prefix}scheduled_execution_time".format(prefix=HIDDEN_TAG_PREFIX) -EXECUTION_KEY_TAG = "{prefix}execution_key".format(prefix=HIDDEN_TAG_PREFIX) +RUN_KEY_TAG = "{prefix}run_key".format(prefix=HIDDEN_TAG_PREFIX) class TagType(Enum): diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -10,7 +10,7 @@ ReconstructablePipeline, ReconstructableRepository, ) -from dagster.core.definitions.sensor import SensorExecutionContext, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SensorExecutionContext, SkipReason from dagster.core.errors import ( DagsterInvalidSubsetError, DagsterRunNotFoundError, @@ -272,11 +272,9 @@ ): tick_data_list = sensor_def.get_tick_data(sensor_context) return ExternalSensorExecutionData( - run_params=[ - tick for tick in tick_data_list if isinstance(tick, SensorRunParams) - ], + run_requests=[tick for tick in tick_data_list if isinstance(tick, RunRequest)], skip_message=tick_data_list[0].skip_message - if tick_data_list and isinstance(tick_data_list[0], SensorSkipData) + if tick_data_list and isinstance(tick_data_list[0], SkipReason) else None, ) diff --git a/python_modules/dagster/dagster/scheduler/sensor.py b/python_modules/dagster/dagster/scheduler/sensor.py --- a/python_modules/dagster/dagster/scheduler/sensor.py +++ b/python_modules/dagster/dagster/scheduler/sensor.py @@ -20,7 +20,7 @@ from dagster.core.instance import DagsterInstance from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter -from dagster.core.storage.tags import EXECUTION_KEY_TAG, check_tags +from dagster.core.storage.tags import RUN_KEY_TAG, check_tags from dagster.utils import merge_dicts from dagster.utils.error import serializable_error_info_from_exc_info @@ -136,7 +136,7 @@ ) else: tick = latest_tick.with_status( - JobTickStatus.STARTED, timestamp=now.timestamp(), execution_key=None, + JobTickStatus.STARTED, timestamp=now.timestamp(), run_key=None, ) instance.update_job_tick(tick) @@ -190,7 +190,7 @@ return assert isinstance(sensor_runtime_data, ExternalSensorExecutionData) - if not sensor_runtime_data.run_params: + if not sensor_runtime_data.run_requests: if sensor_runtime_data.skip_message: context.logger.info( f"Sensor returned false for {external_sensor.name}, skipping: " @@ -212,22 +212,20 @@ subset_pipeline_result.external_pipeline_data, external_repo.handle, ) - for run_params in sensor_runtime_data.run_params: - if run_params.execution_key and instance.has_job_tick( - external_sensor.get_external_origin_id(), - run_params.execution_key, - [JobTickStatus.SUCCESS], + for run_request in sensor_runtime_data.run_requests: + if run_request.run_key and instance.has_job_tick( + external_sensor.get_external_origin_id(), run_request.run_key, [JobTickStatus.SUCCESS], ): context.logger.info( - "Found existing run for sensor {sensor_name} with execution_key `{execution_key}`, skipping.".format( - sensor_name=external_sensor.name, execution_key=run_params.execution_key + "Found existing run for sensor {sensor_name} with run_key `{run_key}`, skipping.".format( + sensor_name=external_sensor.name, run_key=run_request.run_key ) ) - context.add_state(JobTickStatus.SKIPPED, execution_key=run_params.execution_key) + context.add_state(JobTickStatus.SKIPPED, run_key=run_request.run_key) continue run = _get_or_create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) if not run: @@ -254,24 +252,23 @@ _check_for_debug_crash(sensor_debug_crash_flags, "RUN_LAUNCHED") context.add_state( - JobTickStatus.SUCCESS, run_id=run.run_id, execution_key=run_params.execution_key, + JobTickStatus.SUCCESS, run_id=run.run_id, run_key=run_request.run_key, ) def _get_or_create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ): - if not run_params.execution_key: + if not run_request.run_key: return _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) existing_runs = instance.get_runs( PipelineRunsFilter( tags=merge_dicts( - PipelineRun.tags_for_sensor(external_sensor), - {EXECUTION_KEY_TAG: run_params.execution_key}, + PipelineRun.tags_for_sensor(external_sensor), {RUN_KEY_TAG: run_request.run_key}, ) ) ) @@ -285,37 +282,37 @@ # into a SUCCESS state context.logger.info( - f"Run {run.run_id} already completed with the execution key " - f"`{run_params.execution_key}` for {external_sensor.name}" + f"Run {run.run_id} already completed with the run key " + f"`{run_request.run_key}` for {external_sensor.name}" ) context.add_state( - JobTickStatus.SUCCESS, run_id=run.run_id, execution_key=run_params.execution_key, + JobTickStatus.SUCCESS, run_id=run.run_id, run_key=run_request.run_key, ) return None else: context.logger.info( - f"Run {run.run_id} already created with the execution key " - f"`{run_params.execution_key}` for {external_sensor.name}" + f"Run {run.run_id} already created with the run key " + f"`{run_request.run_key}` for {external_sensor.name}" ) return run context.logger.info(f"Creating new run for {external_sensor.name}") return _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) def _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ): execution_plan_errors = [] execution_plan_snapshot = None try: external_execution_plan = repo_location.get_external_execution_plan( external_pipeline, - run_params.run_config, + run_request.run_config, external_sensor.mode, step_keys_to_execute=None, ) @@ -328,15 +325,15 @@ pipeline_tags = external_pipeline.tags or {} check_tags(pipeline_tags, "pipeline_tags") tags = merge_dicts( - merge_dicts(pipeline_tags, run_params.tags), PipelineRun.tags_for_sensor(external_sensor) + merge_dicts(pipeline_tags, run_request.tags), PipelineRun.tags_for_sensor(external_sensor) ) - if run_params.execution_key: - tags[EXECUTION_KEY_TAG] = run_params.execution_key + if run_request.run_key: + tags[RUN_KEY_TAG] = run_request.run_key run = instance.create_run( pipeline_name=external_sensor.pipeline_name, run_id=None, - run_config=run_params.run_config, + run_config=run_request.run_config, mode=external_sensor.mode, solids_to_execute=external_pipeline.solids_to_execute, step_keys_to_execute=None, diff --git a/python_modules/dagster/dagster/utils/test/schedule_storage.py b/python_modules/dagster/dagster/utils/test/schedule_storage.py --- a/python_modules/dagster/dagster/utils/test/schedule_storage.py +++ b/python_modules/dagster/dagster/utils/test/schedule_storage.py @@ -412,22 +412,10 @@ storage.add_job_state(job) def build_job_tick_data( - self, - current_time, - status=JobTickStatus.STARTED, - run_id=None, - error=None, - execution_key=None, + self, current_time, status=JobTickStatus.STARTED, run_id=None, error=None, run_key=None, ): return JobTickData( - "my_sensor", - "my_sensor", - JobType.SENSOR, - status, - current_time, - run_id, - error, - execution_key, + "my_sensor", "my_sensor", JobType.SENSOR, status, current_time, run_id, error, run_key, ) def test_create_job_tick(self, storage): diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -13,7 +13,7 @@ usable_as_dagster_type, ) from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest @lambda_solid @@ -135,8 +135,8 @@ @sensor(pipeline_name="foo_pipeline") def sensor_foo(_): - yield SensorRunParams(execution_key=None, run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) - yield SensorRunParams(execution_key=None, run_config={"foo": "FOO"}) + yield RunRequest(run_key=None, run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) + yield RunRequest(run_key=None, run_config={"foo": "FOO"}) @sensor(pipeline_name="foo_pipeline") diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -15,10 +15,10 @@ instance, repository_handle, "sensor_foo", None ) assert isinstance(result, ExternalSensorExecutionData) - assert len(result.run_params) == 2 - run_params = result.run_params[0] - assert run_params.run_config == {"foo": "FOO"} - assert run_params.tags == {"foo": "foo_tag"} + assert len(result.run_requests) == 2 + run_request = result.run_requests[0] + assert run_request.run_config == {"foo": "FOO"} + assert run_request.tags == {"foo": "foo_tag"} def test_external_sensor_error(): diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py @@ -23,7 +23,7 @@ from dagster.cli.pipeline import pipeline_execute_command from dagster.cli.run import run_list_command, run_wipe_command from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest from dagster.core.test_utils import instance_for_test, instance_for_test_tempdir from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.grpc.server import GrpcServerProcess @@ -120,7 +120,7 @@ run_config = {"foo": "FOO"} if context.last_completion_time: run_config["since"] = context.last_completion_time - return SensorRunParams(execution_key=None, run_config=run_config) + return RunRequest(run_key=None, run_config=run_config) return {"foo_sensor": foo_sensor} diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py @@ -106,7 +106,7 @@ result = runner.invoke(sensor_preview_command, cli_args + ["foo_sensor"],) assert result.exit_code == 0 - assert result.output == "Sensor returning run parameters for 1 run(s):\n\nfoo: FOO\n\n" + assert result.output == "Sensor returning run requests for 1 run(s):\n\nfoo: FOO\n\n" @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) @@ -123,5 +123,5 @@ assert result.exit_code == 0 assert ( result.output - == "Sensor returning run parameters for 1 run(s):\n\nfoo: FOO\nsince: 1.1\n\n" + == "Sensor returning run requests for 1 run(s):\n\nfoo: FOO\nsince: 1.1\n\n" ) diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py @@ -4,7 +4,7 @@ from dagster.core.instance import DagsterInstance from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus from dagster.core.storage.pipeline_run import PipelineRunStatus -from dagster.core.storage.tags import EXECUTION_KEY_TAG, SENSOR_NAME_TAG +from dagster.core.storage.tags import RUN_KEY_TAG, SENSOR_NAME_TAG from dagster.core.test_utils import cleanup_test_instance, get_crash_signals from dagster.daemon import get_default_daemon_logger from dagster.scheduler.sensor import execute_sensor_iteration @@ -129,7 +129,7 @@ ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(frozen_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) @@ -154,8 +154,8 @@ run = instance.get_runs()[0] # Run was created, but hasn't launched yet assert run.status == PipelineRunStatus.NOT_STARTED - assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" - assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" + assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" + assert run.tags.get(RUN_KEY_TAG) == "only_once" # clear output capfd.readouterr() @@ -175,7 +175,7 @@ captured = capfd.readouterr() assert ( - f"Run {run.run_id} already created with the execution key `only_once` for execution_key_sensor" + f"Run {run.run_id} already created with the run key `only_once` for run_key_sensor" in captured.out ) @@ -196,7 +196,7 @@ ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(frozen_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) @@ -220,8 +220,8 @@ run = instance.get_runs()[0] wait_for_all_runs_to_start(instance) - assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" - assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" + assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" + assert run.tags.get(RUN_KEY_TAG) == "only_once" capfd.readouterr() launch_process = multiprocessing.Process( @@ -239,7 +239,7 @@ captured = capfd.readouterr() assert ( - f"Run {run.run_id} already completed with the execution key `only_once` for execution_key_sensor" + f"Run {run.run_id} already completed with the run key `only_once` for run_key_sensor" in captured.out ) diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py @@ -8,7 +8,7 @@ from dagster import pipeline, repository, solid from dagster.core.definitions.decorators.sensor import sensor from dagster.core.definitions.job import JobType -from dagster.core.definitions.sensor import SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SkipReason from dagster.core.host_representation import ( ManagedGrpcPythonEnvRepositoryLocationOrigin, RepositoryLocation, @@ -35,19 +35,19 @@ @sensor(pipeline_name="the_pipeline") def simple_sensor(context): if not context.last_completion_time or not int(context.last_completion_time) % 2: - return SensorSkipData() + return SkipReason() - return SensorRunParams(execution_key=None, run_config={}, tags={}) + return RunRequest(run_key=None, run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") def always_on_sensor(_context): - return SensorRunParams(execution_key=None, run_config={}, tags={}) + return RunRequest(run_key=None, run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") -def execution_key_sensor(_context): - return SensorRunParams(execution_key="only_once", run_config={}, tags={}) +def run_key_sensor(_context): + return RunRequest(run_key="only_once", run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") @@ -57,7 +57,7 @@ @repository def the_repo(): - return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, execution_key_sensor] + return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, run_key_sensor] @contextmanager @@ -95,7 +95,7 @@ expected_status, expected_run_id=None, expected_error=None, - expected_execution_key=None, + expected_run_key=None, ): tick_data = tick.job_tick_data assert tick_data.job_origin_id == external_sensor.get_external_origin_id() @@ -104,7 +104,7 @@ assert tick_data.status == expected_status assert tick_data.timestamp == expected_datetime.timestamp() assert tick_data.run_id == expected_run_id - assert tick_data.execution_key == expected_execution_key + assert tick_data.run_key == expected_run_key if expected_error: assert expected_error in tick_data.error.message @@ -273,7 +273,7 @@ with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(freeze_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) @@ -294,7 +294,7 @@ freeze_datetime, JobTickStatus.SUCCESS, expected_run_id=run.run_id, - expected_execution_key="only_once", + expected_run_key="only_once", ) # run again, ensure @@ -307,10 +307,10 @@ external_sensor, freeze_datetime, JobTickStatus.SKIPPED, - expected_execution_key="only_once", + expected_run_key="only_once", ) captured = capfd.readouterr() assert ( - "Found existing run for sensor execution_key_sensor with execution_key `only_once`, skipping." + "Found existing run for sensor run_key_sensor with run_key `only_once`, skipping." in captured.out )