diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py @@ -1,10 +1,6 @@ from dagster import check from dagster.core.definitions.job import JobType -from dagster.core.host_representation import ( - PipelineSelector, - RepositorySelector, - ScheduleSelector, -) +from dagster.core.host_representation import PipelineSelector, RepositorySelector, ScheduleSelector from dagster.core.scheduler.job import JobStatus from graphql.execution.base import ResolveInfo diff --git a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py --- a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py @@ -1,7 +1,7 @@ import inspect from dagster import check -from dagster.core.definitions.sensor import SensorDefinition, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunParams, RunSkippedData, SensorDefinition from dagster.core.errors import DagsterInvariantViolationError from dagster.utils.backcompat import experimental @@ -13,9 +13,9 @@ signature of the decorated function is more flexible than that of the evaluation_fn in the core API in that it may: - 1. Return a `SensorSkipData` object. - 2. Return a `SensorRunParams` object. - 3. Yield a number of `SensorRunParams` objects. + 1. Return a `RunSkippedData` object. + 2. Return a `RunParams` object. + 3. Yield a number of `RunParams` objects. Takes a :py:class:`~dagster.SensorExecutionContext`. @@ -39,15 +39,15 @@ if inspect.isgenerator(result): for item in result: yield item - elif isinstance(result, (SensorSkipData, SensorRunParams)): + elif isinstance(result, (RunSkippedData, RunParams)): yield result elif result is not None: raise DagsterInvariantViolationError( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " - "{result} of type {type_}. Should only return SensorSkipData or " - "SensorRunParams objects." + "{result} of type {type_}. Should only return RunSkippedData or " + "RunParams objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) diff --git a/python_modules/dagster/dagster/core/definitions/job.py b/python_modules/dagster/dagster/core/definitions/job.py --- a/python_modules/dagster/dagster/core/definitions/job.py +++ b/python_modules/dagster/dagster/core/definitions/job.py @@ -1,7 +1,6 @@ from enum import Enum from dagster import check -from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from .mode import DEFAULT_MODE_NAME @@ -27,6 +26,8 @@ __slots__ = ["_instance"] def __init__(self, instance): + from dagster.core.instance import DagsterInstance + self._instance = check.inst_param(instance, "instance", DagsterInstance) @property diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py --- a/python_modules/dagster/dagster/core/definitions/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -8,38 +8,6 @@ from dagster.utils.backcompat import experimental_class_warning -@whitelist_for_serdes -class SensorSkipData(namedtuple("_SensorSkipData", "skip_message")): - def __new__(cls, skip_message=None): - return super(SensorSkipData, cls).__new__( - cls, skip_message=check.opt_str_param(skip_message, "skip_message") - ) - - -@whitelist_for_serdes -class SensorRunParams(namedtuple("_SensorRunParams", "run_config tags execution_key")): - """ - Represents all the information required to launch a single run instigated by a sensor body. - Must be returned by a SensorDefinition's evaluation function for a run to be launched. - - Attributes: - run_config (Optional[Dict]): The environment config that parameterizes the run execution to - be launched, as a dict. - tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach - to the launched run. - execution_key (Optional[str]): A string key to identify this launched run, to be used for - deduplication across sensor evaluations. - """ - - def __new__(cls, run_config=None, tags=None, execution_key=None): - return super(SensorRunParams, cls).__new__( - cls, - run_config=check.opt_dict_param(run_config, "run_config"), - tags=check.opt_dict_param(tags, "tags"), - execution_key=check.opt_str_param(execution_key, "execution_key"), - ) - - class SensorExecutionContext(JobContext): """Sensor execution context. @@ -66,6 +34,38 @@ return self._last_evaluation_time +@whitelist_for_serdes +class RunSkippedData(namedtuple("_RunSkippedData", "skip_message")): + def __new__(cls, skip_message=None): + return super(RunSkippedData, cls).__new__( + cls, skip_message=check.opt_str_param(skip_message, "skip_message") + ) + + +@whitelist_for_serdes +class RunParams(namedtuple("_RunParams", "run_config tags execution_key")): + """ + Represents all the information required to launch a single run instigated by a sensor body. + Must be returned by a ExecutionDefinition's evaluation function for a run to be launched. + + Attributes: + run_config (Optional[Dict]): The environment config that parameterizes the run execution to + be launched, as a dict. + tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach + to the launched run. + execution_key (Optional[str]): A string key to identify this launched run, to be used for + deduplication across evaluations. + """ + + def __new__(cls, run_config=None, tags=None, execution_key=None): + return super(RunParams, cls).__new__( + cls, + run_config=check.opt_dict_param(run_config, "run_config"), + tags=check.opt_dict_param(tags, "tags"), + execution_key=check.opt_str_param(execution_key, "execution_key"), + ) + + class SensorDefinition(JobDefinition): """Define a sensor that initiates a set of job runs @@ -76,8 +76,8 @@ sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorExecutionContext`. - This function must return a generator, which must yield either a single SensorSkipData - or one or more SensorRunParams objects. + This function must return a generator, which must yield either a single RunSkippedData + or one or more RunParams objects. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this sensor. (default: 'default') @@ -105,9 +105,9 @@ result = list(ensure_gen(self._evaluation_fn(context))) if not result: - return [SensorSkipData()] + return [RunSkippedData()] if len(result) == 1: - return check.is_list(result, of_type=(SensorRunParams, SensorSkipData)) + return check.is_list(result, of_type=(RunParams, RunSkippedData)) - return check.is_list(result, of_type=SensorRunParams) + return check.is_list(result, of_type=RunParams) diff --git a/python_modules/dagster/dagster/core/host_representation/external_data.py b/python_modules/dagster/dagster/core/host_representation/external_data.py --- a/python_modules/dagster/dagster/core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/core/host_representation/external_data.py @@ -18,7 +18,7 @@ ScheduleDefinition, ) from dagster.core.definitions.partition import PartitionScheduleDefinition -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunParams from dagster.core.snap import PipelineSnapshot from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @@ -238,7 +238,7 @@ namedtuple("_ExternalSensorExecutionData", "run_params skip_message") ): def __new__(cls, run_params=None, skip_message=None): - check.opt_list_param(run_params, "run_params", SensorRunParams) + check.opt_list_param(run_params, "run_params", RunParams) check.opt_str_param(skip_message, "skip_message") return super(ExternalSensorExecutionData, cls).__new__( cls, run_params=run_params, skip_message=skip_message, diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -10,7 +10,7 @@ ReconstructablePipeline, ReconstructableRepository, ) -from dagster.core.definitions.sensor import SensorExecutionContext, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunParams, RunSkippedData, SensorExecutionContext from dagster.core.errors import ( DagsterInvalidSubsetError, DagsterRunNotFoundError, @@ -279,11 +279,9 @@ ): tick_data_list = sensor_def.get_tick_data(sensor_context) return ExternalSensorExecutionData( - run_params=[ - tick for tick in tick_data_list if isinstance(tick, SensorRunParams) - ], + run_params=[tick for tick in tick_data_list if isinstance(tick, RunParams)], skip_message=tick_data_list[0].skip_message - if tick_data_list and isinstance(tick_data_list[0], SensorSkipData) + if tick_data_list and isinstance(tick_data_list[0], RunSkippedData) else None, ) diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -13,7 +13,7 @@ usable_as_dagster_type, ) from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunParams @lambda_solid @@ -135,8 +135,8 @@ @sensor(pipeline_name="foo_pipeline") def sensor_foo(_): - yield SensorRunParams(run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) - yield SensorRunParams(run_config={"foo": "FOO"}) + yield RunParams(run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) + yield RunParams(run_config={"foo": "FOO"}) @sensor(pipeline_name="foo_pipeline") diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py @@ -23,7 +23,7 @@ from dagster.cli.pipeline import pipeline_execute_command from dagster.cli.run import run_list_command, run_wipe_command from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunParams from dagster.core.test_utils import instance_for_test, instance_for_test_tempdir from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.grpc.server import GrpcServerProcess @@ -117,7 +117,7 @@ def define_bar_sensors(): @sensor(pipeline_name="baz") def foo_sensor(_): - return SensorRunParams(run_config={"foo": "FOO"}) + return RunParams(run_config={"foo": "FOO"}) return {"foo_sensor": foo_sensor} diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py @@ -8,7 +8,7 @@ from dagster import pipeline, repository, solid from dagster.core.definitions.decorators.sensor import sensor from dagster.core.definitions.job import JobType -from dagster.core.definitions.sensor import SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunParams, RunSkippedData from dagster.core.host_representation import ( ManagedGrpcPythonEnvRepositoryLocationOrigin, RepositoryLocation, @@ -35,19 +35,19 @@ @sensor(pipeline_name="the_pipeline") def simple_sensor(context): if not context.last_evaluation_time or not int(context.last_evaluation_time) % 2: - return SensorSkipData() + return RunSkippedData() - return SensorRunParams(run_config={}, tags={}) + return RunParams(run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") def always_on_sensor(_context): - return SensorRunParams(run_config={}, tags={}) + return RunParams(run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") def execution_key_sensor(_context): - return SensorRunParams(run_config={}, tags={}, execution_key="only_once") + return RunParams(run_config={}, tags={}, execution_key="only_once") @sensor(pipeline_name="the_pipeline")