diff --git a/python_modules/dagster/dagster/api/snapshot_sensor.py b/python_modules/dagster/dagster/api/snapshot_sensor.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/api/snapshot_sensor.py @@ -0,0 +1,43 @@ +from dagster import check +from dagster.core.host_representation.external_data import ( + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, +) +from dagster.core.host_representation.handle import RepositoryHandle +from dagster.grpc.types import SensorExecutionArgs + + +def sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, sensor_name, last_evaluation_time +): + from dagster.grpc.client import ephemeral_grpc_api_client + + origin = repository_handle.get_external_origin() + with ephemeral_grpc_api_client( + origin.repository_location_origin.loadable_target_origin + ) as api_client: + return sync_get_external_sensor_execution_data_grpc( + api_client, instance, repository_handle, sensor_name, last_evaluation_time + ) + + +def sync_get_external_sensor_execution_data_grpc( + api_client, instance, repository_handle, sensor_name, last_evaluation_time +): + check.inst_param(repository_handle, "repository_handle", RepositoryHandle) + check.str_param(sensor_name, "sensor_name") + check.opt_float_param(last_evaluation_time, "last_evaluation_time") + + origin = repository_handle.get_external_origin() + + return check.inst( + api_client.external_sensor_execution( + sensor_execution_args=SensorExecutionArgs( + repository_origin=origin, + instance_ref=instance.get_ref(), + sensor_name=sensor_name, + last_evaluation_time=last_evaluation_time, + ) + ), + (ExternalSensorExecutionData, ExternalSensorExecutionErrorData), + ) diff --git a/python_modules/dagster/dagster/core/definitions/decorators/repository.py b/python_modules/dagster/dagster/core/definitions/decorators/repository.py --- a/python_modules/dagster/dagster/core/definitions/decorators/repository.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/repository.py @@ -67,7 +67,7 @@ "{bad_keys}".format( bad_keys=", ".join( [ - "'{key}'" + "'{key}'".format(key=key) for key in repository_definitions.keys() if key not in VALID_REPOSITORY_DATA_DICT_KEYS ] diff --git a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py @@ -0,0 +1,46 @@ +from dagster import check +from dagster.core.definitions.sensor import SensorDefinition +from dagster.utils.backcompat import experimental + + +@experimental +def sensor( + pipeline_name, name=None, run_config_fn=None, tags_fn=None, solid_selection=None, mode=None, +): + """ + The decorated function will be called to determine whether the provided job should execute, + taking a :py:class:`~dagster.core.definitions.sensor.SensorExecutionContext` + as its only argument, returning a boolean if the execution should fire + + Args: + name (str): The name of this sensor + run_config_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict]]]): A function + that takes a SensorExecutionContext object and returns the environment configuration + that parameterizes this execution, as a dict. + tags_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict[str, str]]]]): A function + that generates tags to attach to the sensor runs. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a dictionary of tags (string + key-value pairs). + solid_selection (Optional[List[str]]): A list of solid subselection (including single + solid names) to execute for runs for this sensor e.g. + ``['*some_solid+', 'other_solid']`` + mode (Optional[str]): The mode to apply when executing runs for this sensor. + (default: 'default') + """ + check.opt_str_param(name, "name") + + def inner(fn): + check.callable_param(fn, "fn") + sensor_name = name or fn.__name__ + + return SensorDefinition( + name=sensor_name, + pipeline_name=pipeline_name, + should_execute=fn, + run_config_fn=run_config_fn, + tags_fn=tags_fn, + solid_selection=solid_selection, + mode=mode, + ) + + return inner diff --git a/python_modules/dagster/dagster/core/definitions/job.py b/python_modules/dagster/dagster/core/definitions/job.py --- a/python_modules/dagster/dagster/core/definitions/job.py +++ b/python_modules/dagster/dagster/core/definitions/job.py @@ -11,6 +11,7 @@ @whitelist_for_serdes class JobType(Enum): SCHEDULE = "SCHEDULE" + SENSOR = "SENSOR" class JobContext(object): diff --git a/python_modules/dagster/dagster/core/definitions/repository.py b/python_modules/dagster/dagster/core/definitions/repository.py --- a/python_modules/dagster/dagster/core/definitions/repository.py +++ b/python_modules/dagster/dagster/core/definitions/repository.py @@ -2,7 +2,7 @@ from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster.utils import merge_dicts -from .job import JobDefinition +from .job import JobDefinition, JobType from .partition import PartitionScheduleDefinition, PartitionSetDefinition from .pipeline import PipelineDefinition from .schedule import ScheduleDefinition @@ -418,6 +418,19 @@ return self._schedules.has_definition(schedule_name) + def get_all_sensors(self): + return [ + definition + for definition in self._jobs.get_all_definitions() + if definition.job_type == JobType.SENSOR + ] + + def get_sensor(self, name): + return self._jobs.get_definition(name) + + def has_sensor(self, name): + return self._jobs.has_definition(name) + def get_all_jobs(self): return self._jobs.get_all_definitions() @@ -597,6 +610,16 @@ def has_schedule_def(self, name): return self._repository_data.has_schedule(name) + @property + def sensor_defs(self): + return self._repository_data.get_all_sensors() + + def get_sensor_def(self, name): + return self._repository_data.get_sensor(name) + + def has_sensor_def(self, name): + return self._repository_data.has_sensor(name) + @property def job_defs(self): return self._repository_data.get_all_jobs() diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -0,0 +1,95 @@ +from dagster import check +from dagster.core.definitions.job import JobContext, JobDefinition, JobType +from dagster.core.instance import DagsterInstance +from dagster.utils.backcompat import experimental_class_warning + + +class SensorExecutionContext(JobContext): + """Sensor execution context. + + An instance of this class is made available as the first argument to the `should_execute` + function on SensorDefinition. + + Attributes: + instance (DagsterInstance): The instance configured to run the schedule + last_evaluation_time (float): The last time that the sensor was evaluated (UTC). + """ + + __slots__ = ["_last_evaluation_time"] + + def __init__(self, instance, last_evaluation_time): + super(SensorExecutionContext, self).__init__( + check.inst_param(instance, "instance", DagsterInstance), + ) + self._last_evaluation_time = check.opt_float_param( + last_evaluation_time, "last_evaluation_time" + ) + + @property + def last_evaluation_time(self): + return self._last_evaluation_time + + +class SensorDefinition(JobDefinition): + """Define a sensor that initiates a set of job runs + + Args: + name (str): The name of the sensor to create. + pipeline_name (str): The name of the pipeline to execute when the sensor fires. + should_execute (Callable[[SensorExecutionContext], bool]): A function that runs + at an interval to determine whether a run should be launched or not. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a boolean (``True`` if the + sensor should execute). + run_config_fn (Callable[[SensorExecutionContext], [Dict]]): A function that takes a + SensorExecutionContext object and returns the environment configuration that + parameterizes this execution, as a dict. + tags_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict[str, str]]]]): A + function that generates tags to attach to the sensors runs. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a dictionary of tags (string + key-value pairs). + solid_selection (Optional[List[str]]): A list of solid subselection (including single + solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` + mode (Optional[str]): The mode to apply when executing this sensor. (default: 'default') + """ + + __slots__ = [ + "_run_config_fn", + "_tags_fn", + "_should_execute", + ] + + def __init__( + self, + name, + pipeline_name, + should_execute, + run_config_fn=None, + tags_fn=None, + solid_selection=None, + mode=None, + ): + experimental_class_warning("SensorDefinition") + super(SensorDefinition, self).__init__( + name, + job_type=JobType.SENSOR, + pipeline_name=pipeline_name, + mode=mode, + solid_selection=solid_selection, + ) + self._should_execute = check.callable_param(should_execute, "should_execute") + self._run_config_fn = check.opt_callable_param( + run_config_fn, "run_config_fn", default=lambda _context: {} + ) + self._tags_fn = check.opt_callable_param(tags_fn, "tags_fn", default=lambda _context: {}) + + def get_run_config(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._run_config_fn(context) + + def get_tags(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._tags_fn(context) + + def should_execute(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._should_execute(context) diff --git a/python_modules/dagster/dagster/core/errors.py b/python_modules/dagster/dagster/core/errors.py --- a/python_modules/dagster/dagster/core/errors.py +++ b/python_modules/dagster/dagster/core/errors.py @@ -450,6 +450,10 @@ """Errors raised in a user process during the execution of schedule.""" +class SensorExecutionError(DagsterUserCodeExecutionError): + """Errors raised in a user process during the execution of a sensor (or its job).""" + + class PartitionExecutionError(DagsterUserCodeExecutionError): """Errors raised during the execution of user-provided functions of a partition set schedule.""" diff --git a/python_modules/dagster/dagster/core/host_representation/external_data.py b/python_modules/dagster/dagster/core/host_representation/external_data.py --- a/python_modules/dagster/dagster/core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/core/host_representation/external_data.py @@ -232,6 +232,27 @@ ) +@whitelist_for_serdes +class ExternalSensorExecutionData( + namedtuple("_ExternalSensorExecutionData", "should_execute run_config tags") +): + def __new__(cls, should_execute=None, run_config=None, tags=None): + return super(ExternalSensorExecutionData, cls).__new__( + cls, + should_execute=check.opt_bool_param(should_execute, "should_execute"), + run_config=check.opt_dict_param(run_config, "run_config"), + tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), + ) + + +@whitelist_for_serdes +class ExternalSensorExecutionErrorData(namedtuple("_ExternalSensorExecutionErrorData", "error")): + def __new__(cls, error): + return super(ExternalSensorExecutionErrorData, cls).__new__( + cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), + ) + + @whitelist_for_serdes class ExternalExecutionParamsData(namedtuple("_ExternalExecutionParamsData", "run_config tags")): def __new__(cls, run_config=None, tags=None): diff --git a/python_modules/dagster/dagster/core/host_representation/repository_location.py b/python_modules/dagster/dagster/core/host_representation/repository_location.py --- a/python_modules/dagster/dagster/core/host_representation/repository_location.py +++ b/python_modules/dagster/dagster/core/host_representation/repository_location.py @@ -14,6 +14,7 @@ from dagster.api.snapshot_pipeline import sync_get_external_pipeline_subset_grpc from dagster.api.snapshot_repository import sync_get_streaming_external_repositories_grpc from dagster.api.snapshot_schedule import sync_get_external_schedule_execution_data_grpc +from dagster.api.snapshot_sensor import sync_get_external_sensor_execution_data_grpc from dagster.core.execution.api import create_execution_plan from dagster.core.host_representation import ( ExternalExecutionPlan, @@ -32,6 +33,7 @@ ) from dagster.grpc.impl import ( get_external_schedule_execution, + get_external_sensor_execution, get_partition_config, get_partition_names, get_partition_set_execution_param_data, @@ -125,6 +127,12 @@ ): pass + @abstractmethod + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + pass + @abstractproperty def is_reload_supported(self): pass @@ -289,6 +297,13 @@ else None, ) + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + return get_external_sensor_execution( + self._recon_repo, instance.get_ref(), name, last_evaluation_time + ) + def get_external_partition_set_execution_param_data( self, repository_handle, partition_set_name, partition_names ): @@ -435,6 +450,13 @@ scheduled_execution_time, ) + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + return sync_get_external_sensor_execution_data_grpc( + self._handle.client, instance, repository_handle, name, last_evaluation_time + ) + def get_external_partition_set_execution_param_data( self, repository_handle, partition_set_name, partition_names ): diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py @@ -26,7 +26,7 @@ syntax="proto3", serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"8\n\x11\x45xecuteRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"H\n\x0f\x45xecuteRunEvent\x12\x35\n-serialized_dagster_event_or_ipc_error_message\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\xc7\x0c\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12>\n\nExecuteRun\x12\x16.api.ExecuteRunRequest\x1a\x14.api.ExecuteRunEvent"\x00\x30\x01\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', + serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"S\n\x1e\x45xternalSensorExecutionRequest\x12\x31\n)serialized_external_sensor_execution_args\x18\x01 \x01(\t"t\n\x1c\x45xternalSensorExecutionReply\x12T\nLserialized_external_sensor_execution_data_or_external_sensor_execution_error\x18\x01 \x01(\t"8\n\x11\x45xecuteRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"H\n\x0f\x45xecuteRunEvent\x12\x35\n-serialized_dagster_event_or_ipc_error_message\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\xac\r\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12\x63\n\x17\x45xternalSensorExecution\x12#.api.ExternalSensorExecutionRequest\x1a!.api.ExternalSensorExecutionReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12>\n\nExecuteRun\x12\x16.api.ExecuteRunRequest\x1a\x14.api.ExecuteRunEvent"\x00\x30\x01\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', ) @@ -1031,6 +1031,88 @@ ) +_EXTERNALSENSOREXECUTIONREQUEST = _descriptor.Descriptor( + name="ExternalSensorExecutionRequest", + full_name="api.ExternalSensorExecutionRequest", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="serialized_external_sensor_execution_args", + full_name="api.ExternalSensorExecutionRequest.serialized_external_sensor_execution_args", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=1918, + serialized_end=2001, +) + + +_EXTERNALSENSOREXECUTIONREPLY = _descriptor.Descriptor( + name="ExternalSensorExecutionReply", + full_name="api.ExternalSensorExecutionReply", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="serialized_external_sensor_execution_data_or_external_sensor_execution_error", + full_name="api.ExternalSensorExecutionReply.serialized_external_sensor_execution_data_or_external_sensor_execution_error", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=2003, + serialized_end=2119, +) + + _EXECUTERUNREQUEST = _descriptor.Descriptor( name="ExecuteRunRequest", full_name="api.ExecuteRunRequest", @@ -1067,8 +1149,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=1918, - serialized_end=1974, + serialized_start=2121, + serialized_end=2177, ) @@ -1108,8 +1190,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=1976, - serialized_end=2048, + serialized_start=2179, + serialized_end=2251, ) @@ -1149,8 +1231,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2050, - serialized_end=2114, + serialized_start=2253, + serialized_end=2317, ) @@ -1190,8 +1272,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2116, - serialized_end=2185, + serialized_start=2319, + serialized_end=2388, ) @@ -1231,8 +1313,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2187, - serialized_end=2253, + serialized_start=2390, + serialized_end=2456, ) @@ -1272,8 +1354,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2255, - serialized_end=2331, + serialized_start=2458, + serialized_end=2534, ) @@ -1313,8 +1395,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2333, - serialized_end=2406, + serialized_start=2536, + serialized_end=2609, ) @@ -1354,8 +1436,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2408, - serialized_end=2462, + serialized_start=2611, + serialized_end=2665, ) @@ -1395,8 +1477,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2464, - serialized_end=2516, + serialized_start=2667, + serialized_end=2719, ) @@ -1436,8 +1518,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2518, - serialized_end=2574, + serialized_start=2721, + serialized_end=2777, ) DESCRIPTOR.message_types_by_name["Empty"] = _EMPTY @@ -1476,6 +1558,8 @@ "ExternalScheduleExecutionRequest" ] = _EXTERNALSCHEDULEEXECUTIONREQUEST DESCRIPTOR.message_types_by_name["ExternalScheduleExecutionReply"] = _EXTERNALSCHEDULEEXECUTIONREPLY +DESCRIPTOR.message_types_by_name["ExternalSensorExecutionRequest"] = _EXTERNALSENSOREXECUTIONREQUEST +DESCRIPTOR.message_types_by_name["ExternalSensorExecutionReply"] = _EXTERNALSENSOREXECUTIONREPLY DESCRIPTOR.message_types_by_name["ExecuteRunRequest"] = _EXECUTERUNREQUEST DESCRIPTOR.message_types_by_name["ExecuteRunEvent"] = _EXECUTERUNEVENT DESCRIPTOR.message_types_by_name["ShutdownServerReply"] = _SHUTDOWNSERVERREPLY @@ -1752,6 +1836,28 @@ ) _sym_db.RegisterMessage(ExternalScheduleExecutionReply) +ExternalSensorExecutionRequest = _reflection.GeneratedProtocolMessageType( + "ExternalSensorExecutionRequest", + (_message.Message,), + { + "DESCRIPTOR": _EXTERNALSENSOREXECUTIONREQUEST, + "__module__": "api_pb2" + # @@protoc_insertion_point(class_scope:api.ExternalSensorExecutionRequest) + }, +) +_sym_db.RegisterMessage(ExternalSensorExecutionRequest) + +ExternalSensorExecutionReply = _reflection.GeneratedProtocolMessageType( + "ExternalSensorExecutionReply", + (_message.Message,), + { + "DESCRIPTOR": _EXTERNALSENSOREXECUTIONREPLY, + "__module__": "api_pb2" + # @@protoc_insertion_point(class_scope:api.ExternalSensorExecutionReply) + }, +) +_sym_db.RegisterMessage(ExternalSensorExecutionReply) + ExecuteRunRequest = _reflection.GeneratedProtocolMessageType( "ExecuteRunRequest", (_message.Message,), @@ -1870,8 +1976,8 @@ index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=2577, - serialized_end=4184, + serialized_start=2780, + serialized_end=4488, methods=[ _descriptor.MethodDescriptor( name="Ping", @@ -2003,10 +2109,20 @@ serialized_options=None, create_key=_descriptor._internal_create_key, ), + _descriptor.MethodDescriptor( + name="ExternalSensorExecution", + full_name="api.DagsterApi.ExternalSensorExecution", + index=13, + containing_service=None, + input_type=_EXTERNALSENSOREXECUTIONREQUEST, + output_type=_EXTERNALSENSOREXECUTIONREPLY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), _descriptor.MethodDescriptor( name="ShutdownServer", full_name="api.DagsterApi.ShutdownServer", - index=13, + index=14, containing_service=None, input_type=_EMPTY, output_type=_SHUTDOWNSERVERREPLY, @@ -2016,7 +2132,7 @@ _descriptor.MethodDescriptor( name="ExecuteRun", full_name="api.DagsterApi.ExecuteRun", - index=14, + index=15, containing_service=None, input_type=_EXECUTERUNREQUEST, output_type=_EXECUTERUNEVENT, @@ -2026,7 +2142,7 @@ _descriptor.MethodDescriptor( name="CancelExecution", full_name="api.DagsterApi.CancelExecution", - index=15, + index=16, containing_service=None, input_type=_CANCELEXECUTIONREQUEST, output_type=_CANCELEXECUTIONREPLY, @@ -2036,7 +2152,7 @@ _descriptor.MethodDescriptor( name="CanCancelExecution", full_name="api.DagsterApi.CanCancelExecution", - index=16, + index=17, containing_service=None, input_type=_CANCANCELEXECUTIONREQUEST, output_type=_CANCANCELEXECUTIONREPLY, @@ -2046,7 +2162,7 @@ _descriptor.MethodDescriptor( name="StartRun", full_name="api.DagsterApi.StartRun", - index=17, + index=18, containing_service=None, input_type=_STARTRUNREQUEST, output_type=_STARTRUNREPLY, @@ -2056,7 +2172,7 @@ _descriptor.MethodDescriptor( name="GetCurrentImage", full_name="api.DagsterApi.GetCurrentImage", - index=18, + index=19, containing_service=None, input_type=_EMPTY, output_type=_GETCURRENTIMAGEREPLY, diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py @@ -87,6 +87,11 @@ request_serializer=api__pb2.ExternalScheduleExecutionRequest.SerializeToString, response_deserializer=api__pb2.ExternalScheduleExecutionReply.FromString, ) + self.ExternalSensorExecution = channel.unary_unary( + "/api.DagsterApi/ExternalSensorExecution", + request_serializer=api__pb2.ExternalSensorExecutionRequest.SerializeToString, + response_deserializer=api__pb2.ExternalSensorExecutionReply.FromString, + ) self.ShutdownServer = channel.unary_unary( "/api.DagsterApi/ShutdownServer", request_serializer=api__pb2.Empty.SerializeToString, @@ -200,6 +205,12 @@ context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def ExternalSensorExecution(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def ShutdownServer(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -304,6 +315,11 @@ request_deserializer=api__pb2.ExternalScheduleExecutionRequest.FromString, response_serializer=api__pb2.ExternalScheduleExecutionReply.SerializeToString, ), + "ExternalSensorExecution": grpc.unary_unary_rpc_method_handler( + servicer.ExternalSensorExecution, + request_deserializer=api__pb2.ExternalSensorExecutionRequest.FromString, + response_serializer=api__pb2.ExternalSensorExecutionReply.SerializeToString, + ), "ShutdownServer": grpc.unary_unary_rpc_method_handler( servicer.ShutdownServer, request_deserializer=api__pb2.Empty.FromString, @@ -720,6 +736,35 @@ metadata, ) + @staticmethod + def ExternalSensorExecution( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/api.DagsterApi/ExternalSensorExecution", + api__pb2.ExternalSensorExecutionRequest.SerializeToString, + api__pb2.ExternalSensorExecutionReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + @staticmethod def ShutdownServer( request, diff --git a/python_modules/dagster/dagster/grpc/client.py b/python_modules/dagster/dagster/grpc/client.py --- a/python_modules/dagster/dagster/grpc/client.py +++ b/python_modules/dagster/dagster/grpc/client.py @@ -27,6 +27,7 @@ PartitionNamesArgs, PartitionSetExecutionParamArgs, PipelineSubsetSnapshotArgs, + SensorExecutionArgs, ) CLIENT_HEARTBEAT_INTERVAL = 1 @@ -252,6 +253,23 @@ res.serialized_external_schedule_execution_data_or_external_schedule_execution_error ) + def external_sensor_execution(self, sensor_execution_args): + check.inst_param( + sensor_execution_args, "sensor_execution_args", SensorExecutionArgs, + ) + + res = self._query( + "ExternalSensorExecution", + api_pb2.ExternalSensorExecutionRequest, + serialized_external_sensor_execution_args=serialize_dagster_namedtuple( + sensor_execution_args + ), + ) + + return deserialize_json_to_dagster_namedtuple( + res.serialized_external_sensor_execution_data_or_external_sensor_execution_error + ) + def execute_run(self, execute_run_args): check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -10,12 +10,14 @@ ReconstructablePipeline, ReconstructableRepository, ) +from dagster.core.definitions.sensor import SensorExecutionContext from dagster.core.errors import ( DagsterInvalidSubsetError, DagsterRunNotFoundError, DagsterSubprocessError, PartitionExecutionError, ScheduleExecutionError, + SensorExecutionError, user_code_error_boundary, ) from dagster.core.events import EngineEventData @@ -31,6 +33,8 @@ ExternalPipelineSubsetResult, ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData, + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, ) from dagster.core.instance import DagsterInstance from dagster.core.snap.execution_plan_snapshot import ( @@ -268,6 +272,53 @@ ) +def get_external_sensor_execution(recon_repo, instance_ref, sensor_name, last_evaluation_timestamp): + check.inst_param( + recon_repo, "recon_repo", ReconstructableRepository, + ) + + definition = recon_repo.get_definition() + sensor_def = definition.get_sensor_def(sensor_name) + + with DagsterInstance.from_ref(instance_ref) as instance: + sensor_context = SensorExecutionContext( + instance, last_evaluation_time=last_evaluation_timestamp + ) + + try: + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of should_execute for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + if not sensor_def.should_execute(sensor_context): + return ExternalSensorExecutionData( + should_execute=False, run_config=None, tags=None + ) + + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of run_config_fn for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + run_config = sensor_def.get_run_config(sensor_context) + + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of tags_fn for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + tags = sensor_def.get_tags(sensor_context) + + return ExternalSensorExecutionData( + should_execute=True, run_config=run_config, tags=tags + ) + except SensorExecutionError: + return ExternalSensorExecutionErrorData( + serializable_error_info_from_exc_info(sys.exc_info()) + ) + + def get_partition_config(recon_repo, partition_set_name, partition_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) diff --git a/python_modules/dagster/dagster/grpc/protos/api.proto b/python_modules/dagster/dagster/grpc/protos/api.proto --- a/python_modules/dagster/dagster/grpc/protos/api.proto +++ b/python_modules/dagster/dagster/grpc/protos/api.proto @@ -18,6 +18,7 @@ rpc ExternalRepository (ExternalRepositoryRequest) returns (ExternalRepositoryReply) {} rpc StreamingExternalRepository (ExternalRepositoryRequest) returns (stream StreamingExternalRepositoryEvent) {} rpc ExternalScheduleExecution (ExternalScheduleExecutionRequest) returns (ExternalScheduleExecutionReply) {} + rpc ExternalSensorExecution (ExternalSensorExecutionRequest) returns (ExternalSensorExecutionReply) {} rpc ShutdownServer (Empty) returns (ShutdownServerReply) {} rpc ExecuteRun (ExecuteRunRequest) returns (stream ExecuteRunEvent) {} rpc CancelExecution (CancelExecutionRequest) returns (CancelExecutionReply) {} @@ -122,6 +123,14 @@ string serialized_external_schedule_execution_data_or_external_schedule_execution_error = 1; } +message ExternalSensorExecutionRequest { + string serialized_external_sensor_execution_args = 1; +} + +message ExternalSensorExecutionReply { + string serialized_external_sensor_execution_data_or_external_sensor_execution_error = 1; +} + message ExecuteRunRequest { string serialized_execute_run_args = 1; } diff --git a/python_modules/dagster/dagster/grpc/server.py b/python_modules/dagster/dagster/grpc/server.py --- a/python_modules/dagster/dagster/grpc/server.py +++ b/python_modules/dagster/dagster/grpc/server.py @@ -44,6 +44,7 @@ get_external_execution_plan_snapshot, get_external_pipeline_subset_result, get_external_schedule_execution, + get_external_sensor_execution, get_partition_config, get_partition_names, get_partition_set_execution_param_data, @@ -65,6 +66,7 @@ PartitionNamesArgs, PartitionSetExecutionParamArgs, PipelineSubsetSnapshotArgs, + SensorExecutionArgs, ShutdownServerResult, StartRunResult, ) @@ -495,6 +497,23 @@ ) ) + def ExternalSensorExecution(self, request, _context): + args = deserialize_json_to_dagster_namedtuple( + request.serialized_external_sensor_execution_args + ) + + check.inst_param(args, "args", SensorExecutionArgs) + + recon_repo = self._recon_repository_from_origin(args.repository_origin) + + return api_pb2.ExternalSensorExecutionReply( + serialized_external_sensor_execution_data_or_external_sensor_execution_error=serialize_dagster_namedtuple( + get_external_sensor_execution( + recon_repo, args.instance_ref, args.sensor_name, args.last_evaluation_time + ) + ) + ) + def ExecuteRun(self, request, _context): if self._shutdown_once_executions_finish_event.is_set(): yield api_pb2.ExecuteRunEvent( diff --git a/python_modules/dagster/dagster/grpc/types.py b/python_modules/dagster/dagster/grpc/types.py --- a/python_modules/dagster/dagster/grpc/types.py +++ b/python_modules/dagster/dagster/grpc/types.py @@ -264,6 +264,26 @@ ) +@whitelist_for_serdes +class SensorExecutionArgs( + namedtuple( + "_SensorExecutionArgs", "repository_origin instance_ref sensor_name last_evaluation_time" + ) +): + def __new__(cls, repository_origin, instance_ref, sensor_name, last_evaluation_time): + return super(SensorExecutionArgs, cls).__new__( + cls, + repository_origin=check.inst_param( + repository_origin, "repository_origin", ExternalRepositoryOrigin + ), + instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), + sensor_name=check.str_param(sensor_name, "sensor_name"), + last_evaluation_time=check.opt_float_param( + last_evaluation_time, "last_evaluation_time" + ), + ) + + @whitelist_for_serdes class ExternalJobArgs(namedtuple("_ExternalJobArgs", "repository_origin instance_ref name",)): def __new__(cls, repository_origin, instance_ref, name): diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -12,6 +12,7 @@ solid, usable_as_dagster_type, ) +from dagster.core.definitions.decorators.sensor import sensor @lambda_solid @@ -131,6 +132,16 @@ } +@sensor(pipeline_name="foo_pipeline", run_config_fn=lambda _: {"foo": "FOO"}) +def sensor_foo(_): + return True + + +@sensor(pipeline_name="foo_pipeline") +def sensor_error(_): + raise Exception("womp womp") + + @repository def bar_repo(): return { @@ -141,4 +152,5 @@ }, "schedules": define_bar_schedules(), "partition_sets": define_baz_partitions(), + "jobs": {"sensor_foo": sensor_foo, "sensor_error": lambda: sensor_error}, } diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py new file mode 100644 --- /dev/null +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -0,0 +1,28 @@ +from dagster.api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc +from dagster.core.host_representation.external_data import ( + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, +) +from dagster.core.test_utils import instance_for_test + +from .utils import get_bar_repo_handle + + +def test_external_sensor_grpc(): + with get_bar_repo_handle() as repository_handle: + with instance_for_test() as instance: + result = sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_foo", None + ) + assert isinstance(result, ExternalSensorExecutionData) + assert result.run_config == {"foo": "FOO"} + + +def test_external_sensor_error(): + with get_bar_repo_handle() as repository_handle: + with instance_for_test() as instance: + result = sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_error", None + ) + assert isinstance(result, ExternalSensorExecutionErrorData) + assert "womp womp" in result.error.to_string()