diff --git a/python_modules/dagster/dagster/api/snapshot_sensor.py b/python_modules/dagster/dagster/api/snapshot_sensor.py new file mode 100644 index 000000000..4740f05c6 --- /dev/null +++ b/python_modules/dagster/dagster/api/snapshot_sensor.py @@ -0,0 +1,43 @@ +from dagster import check +from dagster.core.host_representation.external_data import ( + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, +) +from dagster.core.host_representation.handle import RepositoryHandle +from dagster.grpc.types import SensorExecutionArgs + + +def sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, sensor_name, last_evaluation_time +): + from dagster.grpc.client import ephemeral_grpc_api_client + + origin = repository_handle.get_external_origin() + with ephemeral_grpc_api_client( + origin.repository_location_origin.loadable_target_origin + ) as api_client: + return sync_get_external_sensor_execution_data_grpc( + api_client, instance, repository_handle, sensor_name, last_evaluation_time + ) + + +def sync_get_external_sensor_execution_data_grpc( + api_client, instance, repository_handle, sensor_name, last_evaluation_time +): + check.inst_param(repository_handle, "repository_handle", RepositoryHandle) + check.str_param(sensor_name, "sensor_name") + check.opt_float_param(last_evaluation_time, "last_evaluation_time") + + origin = repository_handle.get_external_origin() + + return check.inst( + api_client.external_sensor_execution( + sensor_execution_args=SensorExecutionArgs( + repository_origin=origin, + instance_ref=instance.get_ref(), + sensor_name=sensor_name, + last_evaluation_time=last_evaluation_time, + ) + ), + (ExternalSensorExecutionData, ExternalSensorExecutionErrorData), + ) diff --git a/python_modules/dagster/dagster/core/definitions/decorators/repository.py b/python_modules/dagster/dagster/core/definitions/decorators/repository.py index 2c115b60d..6a65e4941 100644 --- a/python_modules/dagster/dagster/core/definitions/decorators/repository.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/repository.py @@ -1,227 +1,227 @@ from functools import update_wrapper from dagster import check from dagster.core.errors import DagsterInvalidDefinitionError from ..job import JobDefinition from ..partition import PartitionSetDefinition from ..pipeline import PipelineDefinition from ..repository import VALID_REPOSITORY_DATA_DICT_KEYS, RepositoryData, RepositoryDefinition from ..schedule import ScheduleDefinition class _Repository(object): def __init__(self, name=None, description=None): self.name = check.opt_str_param(name, "name") self.description = check.opt_str_param(description, "description") def __call__(self, fn): check.callable_param(fn, "fn") if not self.name: self.name = fn.__name__ repository_definitions = fn() if not ( isinstance(repository_definitions, list) or isinstance(repository_definitions, dict) or isinstance(repository_definitions, RepositoryData) ): raise DagsterInvalidDefinitionError( "Bad return value of type {type_} from repository construction function: must " "return list, dict, or RepositoryData. See the @repository decorator docstring for " "details and examples".format(type_=type(repository_definitions)), ) if isinstance(repository_definitions, list): bad_definitions = [] for i, definition in enumerate(repository_definitions): if not ( isinstance(definition, PipelineDefinition) or isinstance(definition, PartitionSetDefinition) or isinstance(definition, ScheduleDefinition) or isinstance(definition, JobDefinition) ): bad_definitions.append((i, type(definition))) if bad_definitions: raise DagsterInvalidDefinitionError( "Bad return value from repository construction function: all elements of list " "must be of type PipelineDefinition, PartitionSetDefinition, " "ScheduleDefinition, or JobDefinition. Got {bad_definitions_formatted}.".format( bad_definitions_formatted=", ".join( [ "value of type {type_} at index {i}".format(type_=type_, i=i) for i, type_ in bad_definitions ] ) ) ) repository_data = RepositoryData.from_list(repository_definitions) elif isinstance(repository_definitions, dict): if not set(repository_definitions.keys()).issubset(VALID_REPOSITORY_DATA_DICT_KEYS): raise DagsterInvalidDefinitionError( "Bad return value from repository construction function: dict must not contain " "keys other than {{'pipelines', 'partition_sets', 'schedules', 'jobs'}}: found " "{bad_keys}".format( bad_keys=", ".join( [ - "'{key}'" + "'{key}'".format(key=key) for key in repository_definitions.keys() if key not in VALID_REPOSITORY_DATA_DICT_KEYS ] ) ) ) repository_data = RepositoryData.from_dict(repository_definitions) elif isinstance(repository_definitions, RepositoryData): repository_data = repository_definitions repository_def = RepositoryDefinition( name=self.name, description=self.description, repository_data=repository_data ) update_wrapper(repository_def, fn) return repository_def def repository(name=None, description=None): """Create a repository from the decorated function. The decorated function should take no arguments and its return value should one of: 1. ``List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition]]``. Use this form when you have no need to lazy load pipelines or other definitions. This is the typical use case. 2. A dict of the form: .. code-block:: python { 'pipelines': Dict[str, Callable[[], PipelineDefinition]], 'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]], 'schedules': Dict[str, Callable[[], ScheduleDefinition]] } This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly. 3. An object of type :py:class:`RepositoryData`. Return this object if you need fine-grained control over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory. Args: name (Optional[str]): The name of the repository. Defaults to the name of the decorated function. description (Optional[str]): A string description of the repository. Example: .. code-block:: python ###################################################################### # A simple repository using the first form of the decorated function ###################################################################### @solid(config_schema={n: Field(Int)}) def return_n(context): return context.solid_config['n'] @pipeline(name='simple_pipeline') def simple_pipeline(): return_n() simple_partition_set = PartitionSetDefinition( name='simple_partition_set', pipeline_name='simple_pipeline', partition_fn=lambda: range(10), run_config_fn_for_partition=( lambda partition: { 'solids': {'return_n': {'config': {'n': partition}}} } ), ) simple_schedule = simple_partition_set.create_schedule_definition( schedule_name='simple_daily_10_pm_schedule', cron_schedule='0 22 * * *', ) @repository def simple_repository(): return [simple_pipeline, simple_partition_set, simple_schedule] ###################################################################### # A lazy-loaded repository ###################################################################### def make_expensive_pipeline(): @pipeline(name='expensive_pipeline') def expensive_pipeline(): for i in range(10000): return_n.alias('return_n_{i}'.format(i=i))() return expensive_pipeline expensive_partition_set = PartitionSetDefinition( name='expensive_partition_set', pipeline_name='expensive_pipeline', partition_fn=lambda: range(10), run_config_fn_for_partition=( lambda partition: { 'solids': { 'return_n_{i}'.format(i=i): {'config': {'n': partition}} for i in range(10000) } } ), ) def make_expensive_schedule(): expensive_partition_set.create_schedule_definition( schedule_name='expensive_schedule', cron_schedule='0 22 * * *', ) @repository def lazy_loaded_repository(): return { 'pipelines': {'expensive_pipeline': make_expensive_pipeline}, 'partition_sets': { 'expensive_partition_set': expensive_partition_set }, 'schedules': {'expensive_schedule: make_expensive_schedule} } ###################################################################### # A complex repository that lazily construct pipelines from a directory # of files in a bespoke YAML format ###################################################################### class ComplexRepositoryData(RepositoryData): def __init__(self, yaml_directory): self._yaml_directory = yaml_directory def get_pipeline(self, pipeline_name): return self._construct_pipeline_def_from_yaml_file( self._yaml_file_for_pipeline_name(pipeline_name) ) ... @repository def complex_repository(): return ComplexRepositoryData('some_directory') """ if callable(name): check.invariant(description is None) return _Repository()(name) return _Repository(name=name, description=description) diff --git a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py new file mode 100644 index 000000000..abed2d90d --- /dev/null +++ b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py @@ -0,0 +1,46 @@ +from dagster import check +from dagster.core.definitions.sensor import SensorDefinition +from dagster.utils.backcompat import experimental + + +@experimental +def sensor( + pipeline_name, name=None, run_config_fn=None, tags_fn=None, solid_selection=None, mode=None, +): + """ + The decorated function will be called to determine whether the provided job should execute, + taking a :py:class:`~dagster.core.definitions.sensor.SensorExecutionContext` + as its only argument, returning a boolean if the execution should fire + + Args: + name (str): The name of this sensor + run_config_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict]]]): A function + that takes a SensorExecutionContext object and returns the environment configuration + that parameterizes this execution, as a dict. + tags_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict[str, str]]]]): A function + that generates tags to attach to the sensor runs. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a dictionary of tags (string + key-value pairs). + solid_selection (Optional[List[str]]): A list of solid subselection (including single + solid names) to execute for runs for this sensor e.g. + ``['*some_solid+', 'other_solid']`` + mode (Optional[str]): The mode to apply when executing runs for this sensor. + (default: 'default') + """ + check.opt_str_param(name, "name") + + def inner(fn): + check.callable_param(fn, "fn") + sensor_name = name or fn.__name__ + + return SensorDefinition( + name=sensor_name, + pipeline_name=pipeline_name, + should_execute=fn, + run_config_fn=run_config_fn, + tags_fn=tags_fn, + solid_selection=solid_selection, + mode=mode, + ) + + return inner diff --git a/python_modules/dagster/dagster/core/definitions/job.py b/python_modules/dagster/dagster/core/definitions/job.py index a55962dd2..0a1146729 100644 --- a/python_modules/dagster/dagster/core/definitions/job.py +++ b/python_modules/dagster/dagster/core/definitions/job.py @@ -1,87 +1,88 @@ from enum import Enum from dagster import check from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from .mode import DEFAULT_MODE_NAME from .utils import check_valid_name @whitelist_for_serdes class JobType(Enum): SCHEDULE = "SCHEDULE" + SENSOR = "SENSOR" class JobContext(object): """Context for generating the execution parameters for an JobDefinition at runtime. An instance of this class is made available as the first argument to the JobDefinition functions: run_config_fn, tags_fn Attributes: instance (DagsterInstance): The instance configured to launch the job """ __slots__ = ["_instance"] def __init__(self, instance): self._instance = check.inst_param(instance, "instance", DagsterInstance) @property def instance(self): return self._instance class JobDefinition(object): """Defines a job, which describes a series of runs for a particular pipeline. These runs are grouped by job_name, using tags. Args: name (str): The name of this job. pipeline_name (str): The name of the pipeline to execute. mode (Optional[str]): The mode to apply when executing this pipeline. (default: 'default') solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute. e.g. ``['*some_solid+', 'other_solid']`` """ __slots__ = [ "_name", "_job_type", "_pipeline_name", "_tags_fn", "_run_config_fn", "_mode", "_solid_selection", ] def __init__( self, name, job_type, pipeline_name, mode="default", solid_selection=None, ): self._name = check_valid_name(name) self._job_type = check.inst_param(job_type, "job_type", JobType) self._pipeline_name = check.str_param(pipeline_name, "pipeline_name") self._mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) self._solid_selection = check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ) @property def name(self): return self._name @property def pipeline_name(self): return self._pipeline_name @property def job_type(self): return self._job_type @property def solid_selection(self): return self._solid_selection @property def mode(self): return self._mode diff --git a/python_modules/dagster/dagster/core/definitions/repository.py b/python_modules/dagster/dagster/core/definitions/repository.py index 73fe3f93d..9988e436b 100644 --- a/python_modules/dagster/dagster/core/definitions/repository.py +++ b/python_modules/dagster/dagster/core/definitions/repository.py @@ -1,608 +1,631 @@ from dagster import check from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster.utils import merge_dicts -from .job import JobDefinition +from .job import JobDefinition, JobType from .partition import PartitionScheduleDefinition, PartitionSetDefinition from .pipeline import PipelineDefinition from .schedule import ScheduleDefinition from .utils import check_valid_name VALID_REPOSITORY_DATA_DICT_KEYS = { "pipelines", "partition_sets", "schedules", "jobs", } class _CacheingDefinitionIndex(object): def __init__(self, definition_class, definition_class_name, definition_kind, definitions): for key, definition in definitions.items(): check.invariant( isinstance(definition, definition_class) or callable(definition), "Bad definition for {definition_kind} {key}: must be {definition_class_name} or " "callable, got {type_}".format( definition_kind=definition_kind, key=key, definition_class_name=definition_class_name, type_=type(definition), ), ) self._definition_class = definition_class self._definition_class_name = definition_class_name self._definition_kind = definition_kind self._definitions = definitions self._definition_cache = {} self._definition_names = None self._all_definitions = None def get_definition_names(self): if self._definition_names: return self._definition_names self._definition_names = list(self._definitions.keys()) return self._definition_names def has_definition(self, definition_name): check.str_param(definition_name, "definition_name") return definition_name in self.get_definition_names() def get_all_definitions(self): if self._all_definitions is not None: return self._all_definitions self._all_definitions = list( sorted( map(self.get_definition, self.get_definition_names()), key=lambda definition: definition.name, ) ) return self._all_definitions def get_definition(self, definition_name): check.str_param(definition_name, "definition_name") if definition_name in self._definition_cache: return self._definition_cache[definition_name] if definition_name not in self._definitions: raise DagsterInvariantViolationError( "Could not find {definition_kind} '{definition_name}'. Found: " "{found_names}.".format( definition_kind=self._definition_kind, definition_name=definition_name, found_names=", ".join( [ "'{found_name}'".format(found_name=found_name) for found_name in self.get_definition_names() ] ), ) ) definition_source = self._definitions[definition_name] if isinstance(definition_source, self._definition_class): self._definition_cache[definition_name] = definition_source return definition_source else: definition = definition_source() check.invariant( isinstance(definition, self._definition_class), "Bad constructor for {definition_kind} {definition_name}: must return " "{definition_class_name}, got value of type {type_}".format( definition_kind=self._definition_kind, definition_name=definition_name, definition_class_name=self._definition_class_name, type_=type(definition), ), ) check.invariant( definition.name == definition_name, "Bad constructor for {definition_kind} '{definition_name}': name in " "{definition_class_name} does not match: got '{definition_def_name}'".format( definition_kind=self._definition_kind, definition_name=definition_name, definition_class_name=self._definition_class_name, definition_def_name=definition.name, ), ) self._definition_cache[definition_name] = definition return definition class RepositoryData(object): """Contains definitions belonging to a repository. Users should usually rely on the :py:func:`@repository ` decorator to create new repositories, which will in turn call the static constructors on this class. However, users may subclass RepositoryData for fine-grained control over access to and lazy creation of repository members. """ def __init__(self, pipelines, partition_sets, schedules, jobs): """Constructs a new RepositoryData object. You may pass pipeline, partition_set, and schedule definitions directly, or you may pass callables with no arguments that will be invoked to lazily construct definitions when accessed by name. This can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly. Note that when lazily constructing a definition, the name of the definition must match its key in its dictionary index, or a :py:class:`DagsterInvariantViolationError` will be thrown at retrieval time. Args: pipelines (Dict[str, Union[PipelineDefinition, Callable[[], PipelineDefinition]]]): The pipeline definitions belonging to the repository. partition_sets (Dict[str, Union[PartitionSetDefinition, Callable[[], PartitionSetDefinition]]]): The partition sets belonging to the repository. schedules (Dict[str, Union[ScheduleDefinition, Callable[[], ScheduleDefinition]]]): The schedules belonging to the repository. jobs (Dict[str, Union[JobDefinition, Callable[[], JobDefinition]]]): The predefined jobs for a repository. """ check.dict_param(pipelines, "pipelines", key_type=str) check.dict_param(partition_sets, "partition_sets", key_type=str) check.dict_param(schedules, "schedules", key_type=str) check.dict_param(jobs, "jobs", key_type=str) self._pipelines = _CacheingDefinitionIndex( PipelineDefinition, "PipelineDefinition", "pipeline", pipelines ) self._schedules = _CacheingDefinitionIndex( ScheduleDefinition, "ScheduleDefinition", "schedule", schedules ) schedule_partition_sets = [ schedule.get_partition_set() for schedule in self._schedules.get_all_definitions() if isinstance(schedule, PartitionScheduleDefinition) ] self._partition_sets = _CacheingDefinitionIndex( PartitionSetDefinition, "PartitionSetDefinition", "partition set", merge_dicts( {partition_set.name: partition_set for partition_set in schedule_partition_sets}, partition_sets, ), ) self._jobs = _CacheingDefinitionIndex(JobDefinition, "JobDefinition", "job", jobs,) self._all_pipelines = None self._solids = None self._all_solids = None @staticmethod def from_dict(repository_definitions): """Static constructor. Args: repository_definition (Dict[str, Dict[str, ...]]): A dict of the form: { 'pipelines': Dict[str, Callable[[], PipelineDefinition]], 'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]], 'schedules': Dict[str, Callable[[], ScheduleDefinition]] } This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly. """ check.dict_param(repository_definitions, "repository_definitions", key_type=str) check.invariant( set(repository_definitions.keys()).issubset(VALID_REPOSITORY_DATA_DICT_KEYS), "Bad dict: must not contain keys other than {{{valid_keys}}}: found {bad_keys}.".format( valid_keys=", ".join( ["'{key}'".format(key=key) for key in VALID_REPOSITORY_DATA_DICT_KEYS] ), bad_keys=", ".join( [ "'{key}'" for key in repository_definitions.keys() if key not in VALID_REPOSITORY_DATA_DICT_KEYS ] ), ), ) for key in VALID_REPOSITORY_DATA_DICT_KEYS: if key not in repository_definitions: repository_definitions[key] = {} return RepositoryData(**repository_definitions) @classmethod def from_list(cls, repository_definitions): """Static constructor. Args: repository_definition (List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition]]): Use this constructor when you have no need to lazy load pipelines or other definitions. """ pipelines = {} partition_sets = {} schedules = {} jobs = {} for definition in repository_definitions: if isinstance(definition, PipelineDefinition): if definition.name in pipelines: raise DagsterInvalidDefinitionError( "Duplicate pipeline definition found for pipeline {pipeline_name}".format( pipeline_name=definition.name ) ) pipelines[definition.name] = definition elif isinstance(definition, PartitionSetDefinition): if definition.name in partition_sets: raise DagsterInvalidDefinitionError( "Duplicate partition set definition found for partition set " "{partition_set_name}".format(partition_set_name=definition.name) ) partition_sets[definition.name] = definition elif isinstance(definition, JobDefinition): if isinstance(definition, ScheduleDefinition): if definition.name in schedules: raise DagsterInvalidDefinitionError( "Duplicate schedule definition found for schedule {schedule_name}".format( schedule_name=definition.name ) ) schedules[definition.name] = definition if isinstance(definition, PartitionScheduleDefinition): partition_set_def = definition.get_partition_set() if ( partition_set_def.name in partition_sets and partition_set_def != partition_sets[partition_set_def.name] ): raise DagsterInvalidDefinitionError( "Duplicate partition set definition found for partition set " "{partition_set_name}".format( partition_set_name=partition_set_def.name ) ) partition_sets[partition_set_def.name] = partition_set_def if definition.name in jobs: raise DagsterInvalidDefinitionError( "Duplicate job definition found for job {name}".format(name=definition.name) ) jobs[definition.name] = definition return RepositoryData( pipelines=pipelines, partition_sets=partition_sets, schedules=schedules, jobs=jobs, ) def get_pipeline_names(self): """Get the names of all pipelines in the repository. Returns: List[str] """ return self._pipelines.get_definition_names() def has_pipeline(self, pipeline_name): """Check if a pipeline with a given name is present in the repository. Args: pipeline_name (str): The name of the pipeline. Returns: bool """ check.str_param(pipeline_name, "pipeline_name") return self._pipelines.has_definition(pipeline_name) def get_all_pipelines(self): """Return all pipelines in the repository as a list. Note that this will construct any pipeline that has not yet been constructed. Returns: List[PipelineDefinition]: All pipelines in the repository. """ if self._all_pipelines is not None: return self._all_pipelines self._all_pipelines = self._pipelines.get_all_definitions() self.get_all_solid_defs() return self._all_pipelines def get_pipeline(self, pipeline_name): """Get a pipeline by name. If this pipeline has not yet been constructed, only this pipeline is constructed, and will be cached for future calls. Args: pipeline_name (str): Name of the pipeline to retrieve. Returns: PipelineDefinition: The pipeline definition corresponding to the given name. """ check.str_param(pipeline_name, "pipeline_name") return self._pipelines.get_definition(pipeline_name) def get_partition_set_names(self): """Get the names of all partition sets in the repository. Returns: List[str] """ return self._partition_sets.get_definition_names() def has_partition_set(self, partition_set_name): """Check if a partition set with a given name is present in the repository. Args: partition_set_name (str): The name of the partition set. Returns: bool """ check.str_param(partition_set_name, "partition_set_name") return self._partition_sets.has_definition(partition_set_name) def get_all_partition_sets(self): """Return all partition sets in the repository as a list. Note that this will construct any partition set that has not yet been constructed. Returns: List[PartitionSetDefinition]: All partition sets in the repository. """ return self._partition_sets.get_all_definitions() def get_partition_set(self, partition_set_name): """Get a partition set by name. If this partition set has not yet been constructed, only this partition set is constructed, and will be cached for future calls. Args: partition_set_name (str): Name of the partition set to retrieve. Returns: PartitionSetDefinition: The partition set definition corresponding to the given name. """ check.str_param(partition_set_name, "partition_set_name") return self._partition_sets.get_definition(partition_set_name) def get_schedule_names(self): """Get the names of all schedules in the repository. Returns: List[str] """ return self._schedules.get_definition_names() def get_all_schedules(self): """Return all schedules in the repository as a list. Note that this will construct any schedule that has not yet been constructed. Returns: List[ScheduleDefinition]: All pipelines in the repository. """ return self._schedules.get_all_definitions() def get_schedule(self, schedule_name): """Get a schedule by name. if this schedule has not yet been constructed, only this schedule is constructed, and will be cached for future calls. args: schedule_name (str): name of the schedule to retrieve. Returns: ScheduleDefinition: The schedule definition corresponding to the given name. """ check.str_param(schedule_name, "schedule_name") return self._schedules.get_definition(schedule_name) def has_schedule(self, schedule_name): check.str_param(schedule_name, "schedule_name") return self._schedules.has_definition(schedule_name) + def get_all_sensors(self): + return [ + definition + for definition in self._jobs.get_all_definitions() + if definition.job_type == JobType.SENSOR + ] + + def get_sensor(self, name): + return self._jobs.get_definition(name) + + def has_sensor(self, name): + return self._jobs.has_definition(name) + def get_all_jobs(self): return self._jobs.get_all_definitions() def get_job(self, name): check.str_param(name, "name") return self._jobs.get_definition(name) def has_job(self, name): check.str_param(name, "name") return self._jobs.has_definition(name) def get_all_solid_defs(self): if self._all_solids is not None: return self._all_solids self._all_solids = self._construct_solid_defs() return list(self._all_solids.values()) def has_solid(self, solid_name): if self._all_solids is not None: return solid_name in self._all_solids self._all_solids = self._construct_solid_defs() return solid_name in self._all_solids def _construct_solid_defs(self): solid_defs = {} solid_to_pipeline = {} # This looks like it should infinitely loop but the # memoization of _all_pipelines and _all_solids short # circuits that for pipeline in self.get_all_pipelines(): for solid_def in pipeline.all_solid_defs: if solid_def.name not in solid_defs: solid_defs[solid_def.name] = solid_def solid_to_pipeline[solid_def.name] = pipeline.name if not solid_defs[solid_def.name] is solid_def: first_name, second_name = sorted( [solid_to_pipeline[solid_def.name], pipeline.name] ) raise DagsterInvalidDefinitionError( ( "Duplicate solids found in repository with name '{solid_def_name}'. " "Solid definition names must be unique within a repository. Solid is " "defined in pipeline '{first_pipeline_name}' and in pipeline " "'{second_pipeline_name}'." ).format( solid_def_name=solid_def.name, first_pipeline_name=first_name, second_pipeline_name=second_name, ) ) return solid_defs def solid_def_named(self, name): """Get the solid with the given name in the repository. Args: name (str): The name of the solid for which to retrieve the solid definition. Returns: SolidDefinition: The solid with the given name. """ check.str_param(name, "name") if not self.has_solid(name): check.failed("could not find solid_def for solid {name}".format(name=name)) return self._all_solids[name] class RepositoryDefinition(object): """Define a repository that contains a collection of definitions. Users should typically not create objects of this class directly. Instead, use the :py:func:`@repository` decorator. Args: name (str): The name of the repository. repository_data (RepositoryData): Contains the definitions making up the repository. description (Optional[str]): A string description of the repository. """ def __init__( self, name, repository_data, description=None, ): self._name = check_valid_name(name) self._description = check.opt_str_param(description, "description") self._repository_data = check.inst_param(repository_data, "repository_data", RepositoryData) @property def name(self): return self._name @property def description(self): return self._description @property def pipeline_names(self): """List[str]: Names of all pipelines in the repository""" return self._repository_data.get_pipeline_names() def has_pipeline(self, name): """Check if a pipeline with a given name is present in the repository. Args: name (str): The name of the pipeline. Returns: bool """ return self._repository_data.has_pipeline(name) def get_pipeline(self, name): """Get a pipeline by name. If this pipeline is present in the lazily evaluated ``pipeline_dict`` passed to the constructor, but has not yet been constructed, only this pipeline is constructed, and will be cached for future calls. Args: name (str): Name of the pipeline to retrieve. Returns: PipelineDefinition: The pipeline definition corresponding to the given name. """ return self._repository_data.get_pipeline(name) def get_all_pipelines(self): """Return all pipelines in the repository as a list. Note that this will construct any pipeline in the lazily evaluated ``pipeline_dict`` that has not yet been constructed. Returns: List[PipelineDefinition]: All pipelines in the repository. """ return self._repository_data.get_all_pipelines() def get_all_solid_defs(self): """Get all the solid definitions in a repository. Returns: List[SolidDefinition]: All solid definitions in the repository. """ return self._repository_data.get_all_solid_defs() def solid_def_named(self, name): """Get the solid with the given name in the repository. Args: name (str): The name of the solid for which to retrieve the solid definition. Returns: SolidDefinition: The solid with the given name. """ check.str_param(name, "name") return self._repository_data.solid_def_named(name) @property def partition_set_defs(self): return self._repository_data.get_all_partition_sets() def get_partition_set_def(self, name): return self._repository_data.get_partition_set(name) @property def schedule_defs(self): return self._repository_data.get_all_schedules() def get_schedule_def(self, name): return self._repository_data.get_schedule(name) def has_schedule_def(self, name): return self._repository_data.has_schedule(name) + @property + def sensor_defs(self): + return self._repository_data.get_all_sensors() + + def get_sensor_def(self, name): + return self._repository_data.get_sensor(name) + + def has_sensor_def(self, name): + return self._repository_data.has_sensor(name) + @property def job_defs(self): return self._repository_data.get_all_jobs() def get_job_def(self, name): return self._repository_data.get_job(name) def has_job_def(self, name): return self._repository_data.has_job(name) diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py new file mode 100644 index 000000000..fa747ca1f --- /dev/null +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -0,0 +1,95 @@ +from dagster import check +from dagster.core.definitions.job import JobContext, JobDefinition, JobType +from dagster.core.instance import DagsterInstance +from dagster.utils.backcompat import experimental_class_warning + + +class SensorExecutionContext(JobContext): + """Sensor execution context. + + An instance of this class is made available as the first argument to the `should_execute` + function on SensorDefinition. + + Attributes: + instance (DagsterInstance): The instance configured to run the schedule + last_evaluation_time (float): The last time that the sensor was evaluated (UTC). + """ + + __slots__ = ["_last_evaluation_time"] + + def __init__(self, instance, last_evaluation_time): + super(SensorExecutionContext, self).__init__( + check.inst_param(instance, "instance", DagsterInstance), + ) + self._last_evaluation_time = check.opt_float_param( + last_evaluation_time, "last_evaluation_time" + ) + + @property + def last_evaluation_time(self): + return self._last_evaluation_time + + +class SensorDefinition(JobDefinition): + """Define a sensor that initiates a set of job runs + + Args: + name (str): The name of the sensor to create. + pipeline_name (str): The name of the pipeline to execute when the sensor fires. + should_execute (Callable[[SensorExecutionContext], bool]): A function that runs + at an interval to determine whether a run should be launched or not. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a boolean (``True`` if the + sensor should execute). + run_config_fn (Callable[[SensorExecutionContext], [Dict]]): A function that takes a + SensorExecutionContext object and returns the environment configuration that + parameterizes this execution, as a dict. + tags_fn (Optional[Callable[[SensorExecutionContext], Optional[Dict[str, str]]]]): A + function that generates tags to attach to the sensors runs. Takes a + :py:class:`~dagster.SensorExecutionContext` and returns a dictionary of tags (string + key-value pairs). + solid_selection (Optional[List[str]]): A list of solid subselection (including single + solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` + mode (Optional[str]): The mode to apply when executing this sensor. (default: 'default') + """ + + __slots__ = [ + "_run_config_fn", + "_tags_fn", + "_should_execute", + ] + + def __init__( + self, + name, + pipeline_name, + should_execute, + run_config_fn=None, + tags_fn=None, + solid_selection=None, + mode=None, + ): + experimental_class_warning("SensorDefinition") + super(SensorDefinition, self).__init__( + name, + job_type=JobType.SENSOR, + pipeline_name=pipeline_name, + mode=mode, + solid_selection=solid_selection, + ) + self._should_execute = check.callable_param(should_execute, "should_execute") + self._run_config_fn = check.opt_callable_param( + run_config_fn, "run_config_fn", default=lambda _context: {} + ) + self._tags_fn = check.opt_callable_param(tags_fn, "tags_fn", default=lambda _context: {}) + + def get_run_config(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._run_config_fn(context) + + def get_tags(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._tags_fn(context) + + def should_execute(self, context): + check.inst_param(context, "context", SensorExecutionContext) + return self._should_execute(context) diff --git a/python_modules/dagster/dagster/core/errors.py b/python_modules/dagster/dagster/core/errors.py index 93c5754bb..7df4c2d39 100644 --- a/python_modules/dagster/dagster/core/errors.py +++ b/python_modules/dagster/dagster/core/errors.py @@ -1,489 +1,493 @@ """Core Dagster error classes. All errors thrown by the Dagster framework inherit from :py:class:`~dagster.DagsterError`. Users should not subclass this base class for their own exceptions. There is another exception base class, :py:class:`~dagster.DagsterUserCodeExecutionError`, which is used by the framework in concert with the :py:func:`~dagster.core.errors.user_code_error_boundary`. Dagster uses this construct to wrap user code into which it calls. User code can perform arbitrary computations and may itself throw exceptions. The error boundary catches these user code-generated exceptions, and then reraises them wrapped in a subclass of :py:class:`~dagster.DagsterUserCodeExecutionError`. The wrapped exceptions include additional context for the original exceptions, injected by the Dagster runtime. """ import sys import traceback from contextlib import contextmanager from dagster import check from future.utils import raise_from class DagsterError(Exception): """Base class for all errors thrown by the Dagster framework. Users should not subclass this base class for their own exceptions.""" @property def is_user_code_error(self): """Returns true if this error is attributable to user code.""" return False class DagsterInvalidDefinitionError(DagsterError): """Indicates that the rules for a definition have been violated by the user.""" class DagsterInvalidSubsetError(DagsterError): """Indicates that a subset of a pipeline is invalid because either: - One or more solids in the specified subset do not exist on the pipeline.' - The subset produces an invalid pipeline. """ CONFIG_ERROR_VERBIAGE = """ This value can be a: - Field - Python primitive types that resolve to dagster config types - int, float, bool, str, list. - A dagster config type: Int, Float, Bool, List, Optional, Selector, Shape, Permissive - A bare python dictionary, which is wrapped in Field(Shape(...)). Any values in the dictionary get resolved by the same rules, recursively. - A python list with a single entry that can resolve to a type, e.g. [int] """ class DagsterInvalidConfigDefinitionError(DagsterError): """Indicates that you have attempted to construct a config with an invalid value Acceptable values for config types are any of: 1. A Python primitive type that resolves to a Dagster config type (:py:class:`~python:int`, :py:class:`~python:float`, :py:class:`~python:bool`, :py:class:`~python:str`, or :py:class:`~python:list`). 2. A Dagster config type: :py:data:`~dagster.Int`, :py:data:`~dagster.Float`, :py:data:`~dagster.Bool`, :py:data:`~dagster.String`, :py:data:`~dagster.StringSource`, :py:data:`~dagster.Any`, :py:class:`~dagster.Array`, :py:data:`~dagster.Noneable`, :py:data:`~dagster.Enum`, :py:class:`~dagster.Selector`, :py:class:`~dagster.Shape`, or :py:class:`~dagster.Permissive`. 3. A bare python dictionary, which will be automatically wrapped in :py:class:`~dagster.Shape`. Values of the dictionary are resolved recursively according to the same rules. 4. A bare python list of length one which itself is config type. Becomes :py:class:`Array` with list element as an argument. 5. An instance of :py:class:`~dagster.Field`. """ def __init__(self, original_root, current_value, stack, reason=None, **kwargs): self.original_root = original_root self.current_value = current_value self.stack = stack super(DagsterInvalidConfigDefinitionError, self).__init__( ( "Error defining config. Original value passed: {original_root}. " "{stack_str}{current_value} " "cannot be resolved.{reason_str}" + CONFIG_ERROR_VERBIAGE ).format( original_root=repr(original_root), stack_str="Error at stack path :" + ":".join(stack) + ". " if stack else "", current_value=repr(current_value), reason_str=" Reason: {reason}.".format(reason=reason) if reason else "", ), **kwargs, ) class DagsterInvariantViolationError(DagsterError): """Indicates the user has violated a well-defined invariant that can only be enforced at runtime.""" class DagsterExecutionStepNotFoundError(DagsterError): """Thrown when the user specifies execution step keys that do not exist.""" def __init__(self, *args, **kwargs): self.step_keys = check.list_param(kwargs.pop("step_keys"), "step_keys", str) super(DagsterExecutionStepNotFoundError, self).__init__(*args, **kwargs) class DagsterRunNotFoundError(DagsterError): """Thrown when a run cannot be found in run storage.""" def __init__(self, *args, **kwargs): self.invalid_run_id = check.str_param(kwargs.pop("invalid_run_id"), "invalid_run_id") super(DagsterRunNotFoundError, self).__init__(*args, **kwargs) class DagsterStepOutputNotFoundError(DagsterError): """Indicates that previous step outputs required for an execution step to proceed are not available.""" def __init__(self, *args, **kwargs): self.step_key = check.str_param(kwargs.pop("step_key"), "step_key") self.output_name = check.str_param(kwargs.pop("output_name"), "output_name") super(DagsterStepOutputNotFoundError, self).__init__(*args, **kwargs) def _add_inner_exception_for_py2(msg, exc_info): if sys.version_info[0] == 2: return ( msg + "\n\nThe above exception was the direct cause of the following exception:\n\n" + "".join(traceback.format_exception(*exc_info)) ) return msg @contextmanager def user_code_error_boundary(error_cls, msg_fn, control_flow_exceptions=None, **kwargs): """ Wraps the execution of user-space code in an error boundary. This places a uniform policy around an user code invoked by the framework. This ensures that all user errors are wrapped in an exception derived from DagsterUserCodeExecutionError, and that the original stack trace of the user error is preserved, so that it can be reported without confusing framework code in the stack trace, if a tool author wishes to do so. This has been especially help in a notebooking context. Examples: .. code-block:: python with user_code_error_boundary( # Pass a class that inherits from DagsterUserCodeExecutionError DagstermillExecutionError, # Pass a function that produces a message lambda: 'Error occurred during the execution of Dagstermill solid ' '{solid_name}: {notebook_path}'.format( solid_name=name, notebook_path=notebook_path ), ): call_user_provided_function() """ check.callable_param(msg_fn, "msg_fn") check.subclass_param(error_cls, "error_cls", DagsterUserCodeExecutionError) control_flow_exceptions = tuple( check.opt_list_param(control_flow_exceptions, "control_flow_exceptions") ) try: yield except control_flow_exceptions as cf: # A control flow exception has occurred and should be propagated raise cf except DagsterError as de: # The system has thrown an error that is part of the user-framework contract raise de except Exception as e: # pylint: disable=W0703 # An exception has been thrown by user code and computation should cease # with the error reported further up the stack raise_from( error_cls(msg_fn(), user_exception=e, original_exc_info=sys.exc_info(), **kwargs), e ) class DagsterUserCodeExecutionError(DagsterError): """ This is the base class for any exception that is meant to wrap an :py:class:`~python:Exception` thrown by user code. It wraps that existing user code. The ``original_exc_info`` argument to the constructor is meant to be a tuple of the type returned by :py:func:`sys.exc_info ` at the call site of the constructor. Users should not subclass this base class for their own exceptions and should instead throw freely from user code. User exceptions will be automatically wrapped and rethrown. """ def __init__(self, *args, **kwargs): # original_exc_info should be gotten from a sys.exc_info() call at the # callsite inside of the exception handler. this will allow consuming # code to *re-raise* the user error in it's original format # for cleaner error reporting that does not have framework code in it user_exception = check.inst_param(kwargs.pop("user_exception"), "user_exception", Exception) original_exc_info = check.tuple_param(kwargs.pop("original_exc_info"), "original_exc_info") check.invariant(original_exc_info[0] is not None) msg = _add_inner_exception_for_py2(args[0], original_exc_info) super(DagsterUserCodeExecutionError, self).__init__(msg, *args[1:], **kwargs) self.user_exception = check.opt_inst_param(user_exception, "user_exception", Exception) self.original_exc_info = original_exc_info @property def is_user_code_error(self): return True class DagsterTypeCheckError(DagsterUserCodeExecutionError): """Indicates an error in the solid type system at runtime. E.g. a solid receives an unexpected input, or produces an output that does not match the type of the output definition. """ class DagsterExecutionStepExecutionError(DagsterUserCodeExecutionError): """Indicates an error occurred while executing the body of an execution step.""" def __init__(self, *args, **kwargs): self.step_key = check.str_param(kwargs.pop("step_key"), "step_key") self.solid_name = check.str_param(kwargs.pop("solid_name"), "solid_name") self.solid_def_name = check.str_param(kwargs.pop("solid_def_name"), "solid_def_name") super(DagsterExecutionStepExecutionError, self).__init__(*args, **kwargs) class DagsterResourceFunctionError(DagsterUserCodeExecutionError): """ Indicates an error occurred while executing the body of the ``resource_fn`` in a :py:class:`~dagster.ResourceDefinition` during resource initialization. """ class DagsterConfigMappingFunctionError(DagsterUserCodeExecutionError): """ Indicates that an unexpected error occurred while executing the body of a config mapping function defined in a :py:class:`~dagster.CompositeSolidDefinition` during config parsing. """ class DagsterTypeLoadingError(DagsterUserCodeExecutionError): """ Indicates that an unexpected error occurred while executing the body of an type load function defined in a :py:class:`~dagster.DagsterTypeLoader` during loading of a custom type. """ class DagsterTypeMaterializationError(DagsterUserCodeExecutionError): """ Indicates that an unexpected error occurred while executing the body of an output materialization function defined in a :py:class:`~dagster.DagsterTypeMaterializer` during materialization of a custom type. """ class DagsterUnknownResourceError(DagsterError, AttributeError): # inherits from AttributeError as it is raised within a __getattr__ call... used to support # object hasattr method """ Indicates that an unknown resource was accessed in the body of an execution step. May often happen by accessing a resource in the compute function of a solid without first supplying the solid with the correct `required_resource_keys` argument. """ def __init__(self, resource_name, *args, **kwargs): self.resource_name = check.str_param(resource_name, "resource_name") msg = ( "Unknown resource `{resource_name}`. Specify `{resource_name}` as a required resource " "on the compute / config function that accessed it." ).format(resource_name=resource_name) super(DagsterUnknownResourceError, self).__init__(msg, *args, **kwargs) class DagsterInvalidConfigError(DagsterError): """Thrown when provided config is invalid (does not type check against the relevant config schema).""" def __init__(self, preamble, errors, config_value, *args, **kwargs): from dagster.config.errors import EvaluationError check.str_param(preamble, "preamble") self.errors = check.list_param(errors, "errors", of_type=EvaluationError) self.config_value = config_value error_msg = preamble error_messages = [] for i_error, error in enumerate(self.errors): error_messages.append(error.message) error_msg += "\n Error {i_error}: {error_message}".format( i_error=i_error + 1, error_message=error.message ) self.message = error_msg self.error_messages = error_messages super(DagsterInvalidConfigError, self).__init__(error_msg, *args, **kwargs) class DagsterUnmetExecutorRequirementsError(DagsterError): """Indicates the resolved executor is incompatible with the state of other systems such as the :py:class:`~dagster.core.instance.DagsterInstance` or system storage configuration. """ class DagsterSubprocessError(DagsterError): """An exception has occurred in one or more of the child processes dagster manages. This error forwards the message and stack trace for all of the collected errors. """ def __init__(self, *args, **kwargs): from dagster.utils.error import SerializableErrorInfo self.subprocess_error_infos = check.list_param( kwargs.pop("subprocess_error_infos"), "subprocess_error_infos", SerializableErrorInfo ) super(DagsterSubprocessError, self).__init__(*args, **kwargs) class DagsterUserCodeProcessError(DagsterError): """An exception has occurred in a user code process that the host process raising this error was communicating with.""" def __init__(self, *args, **kwargs): from dagster.utils.error import SerializableErrorInfo self.user_code_process_error_infos = check.list_param( kwargs.pop("user_code_process_error_infos"), "user_code_process_error_infos", SerializableErrorInfo, ) super(DagsterUserCodeProcessError, self).__init__(*args, **kwargs) class DagsterLaunchFailedError(DagsterError): """Indicates an error while attempting to launch a pipeline run. """ def __init__(self, *args, **kwargs): from dagster.utils.error import SerializableErrorInfo self.serializable_error_info = check.opt_inst_param( kwargs.pop("serializable_error_info", None), "serializable_error_info", SerializableErrorInfo, ) super(DagsterLaunchFailedError, self).__init__(*args, **kwargs) class DagsterBackfillFailedError(DagsterError): """Indicates an error while attempting to launch a backfill. """ def __init__(self, *args, **kwargs): from dagster.utils.error import SerializableErrorInfo self.serializable_error_info = check.opt_inst_param( kwargs.pop("serializable_error_info", None), "serializable_error_info", SerializableErrorInfo, ) super(DagsterBackfillFailedError, self).__init__(*args, **kwargs) class DagsterInstanceMigrationRequired(DagsterError): """Indicates that the dagster instance must be migrated.""" def __init__(self, msg=None, db_revision=None, head_revision=None, original_exc_info=None): super(DagsterInstanceMigrationRequired, self).__init__( "Instance is out of date and must be migrated{additional_msg}." "{revision_clause} Please run `dagster instance migrate`.{original_exception_clause}".format( additional_msg=" ({msg})".format(msg=msg) if msg else "", revision_clause=( " Database is at revision {db_revision}, head is " "{head_revision}.".format(db_revision=db_revision, head_revision=head_revision) if db_revision or head_revision else "" ), original_exception_clause=( "\n\nOriginal exception:\n\n{original_exception}".format( original_exception="".join(traceback.format_exception(*original_exc_info)) ) if original_exc_info else "" ), ) ) class DagsterRunAlreadyExists(DagsterError): """Indicates that a pipeline run already exists in a run storage.""" class DagsterSnapshotDoesNotExist(DagsterError): """Indicates you attempted to create a pipeline run with a nonexistent snapshot id""" class DagsterRunConflict(DagsterError): """Indicates that a conflicting pipeline run exists in a run storage.""" class DagsterTypeCheckDidNotPass(DagsterError): """Indicates that a type check failed. This is raised when ``raise_on_error`` is ``True`` in calls to the synchronous pipeline and solid execution APIs (:py:func:`~dagster.execute_pipeline`, :py:func:`~dagster.execute_solid`, etc.), that is, typically in test, and a :py:class:`~dagster.DagsterType`'s type check fails by returning either ``False`` or an instance of :py:class:`~dagster.TypeCheck` whose ``success`` member is ``False``. """ def __init__(self, description=None, metadata_entries=None, dagster_type=None): from dagster import EventMetadataEntry, DagsterType super(DagsterTypeCheckDidNotPass, self).__init__(description) self.description = check.opt_str_param(description, "description") self.metadata_entries = check.opt_list_param( metadata_entries, "metadata_entries", of_type=EventMetadataEntry ) self.dagster_type = check.opt_inst_param(dagster_type, "dagster_type", DagsterType) class DagsterEventLogInvalidForRun(DagsterError): """Raised when the event logs for a historical run are malformed or invalid.""" def __init__(self, run_id): self.run_id = check.str_param(run_id, "run_id") super(DagsterEventLogInvalidForRun, self).__init__( "Event logs invalid for run id {}".format(run_id) ) class ScheduleExecutionError(DagsterUserCodeExecutionError): """Errors raised in a user process during the execution of schedule.""" +class SensorExecutionError(DagsterUserCodeExecutionError): + """Errors raised in a user process during the execution of a sensor (or its job).""" + + class PartitionExecutionError(DagsterUserCodeExecutionError): """Errors raised during the execution of user-provided functions of a partition set schedule.""" class DagsterInvalidAssetKey(DagsterError): """ Error raised by invalid asset key """ class HookExecutionError(DagsterUserCodeExecutionError): """ Error raised during the execution of a user-defined hook. """ class DagsterImportError(DagsterError): """ Import error raised while importing user-code. """ class JobError(DagsterUserCodeExecutionError): """Errors raised during the execution of user-provided functions for a defined Job.""" class DagsterAddressIOError(DagsterError): """ IO error raised while operating data assets. """ def __init__(self, msg): super(DagsterAddressIOError, self).__init__(msg if msg else "") class DagsterUnknownStepStateError(DagsterError): """When pipeline execution complete with steps in an unknown state""" class DagsterIncompleteExecutionPlanError(DagsterError): """When pipeline execution completes with an active execution in an incomplete state""" class DagsterObjectStoreError(DagsterError): """Errors during an object store operation.""" diff --git a/python_modules/dagster/dagster/core/host_representation/external_data.py b/python_modules/dagster/dagster/core/host_representation/external_data.py index 395c94bc3..38822fb5d 100644 --- a/python_modules/dagster/dagster/core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/core/host_representation/external_data.py @@ -1,410 +1,431 @@ """ This module contains data objects meant to be serialized between host processes and user processes. They should contain no business logic or clever indexing. Use the classes in external.py for that. """ from collections import namedtuple from dagster import check from dagster.core.definitions import ( JobDefinition, JobType, PartitionSetDefinition, PipelineDefinition, PresetDefinition, RepositoryDefinition, ScheduleDefinition, ) from dagster.core.definitions.partition import PartitionScheduleDefinition from dagster.core.snap import PipelineSnapshot from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @whitelist_for_serdes class ExternalRepositoryData( namedtuple( "_ExternalRepositoryData", "name external_pipeline_datas external_schedule_datas external_partition_set_datas external_executable_datas external_job_datas", ) ): def __new__( cls, name, external_pipeline_datas, external_schedule_datas, external_partition_set_datas, external_executable_datas=None, external_job_datas=None, ): return super(ExternalRepositoryData, cls).__new__( cls, name=check.str_param(name, "name"), external_pipeline_datas=check.list_param( external_pipeline_datas, "external_pipeline_datas", of_type=ExternalPipelineData ), external_schedule_datas=check.list_param( external_schedule_datas, "external_schedule_datas", of_type=ExternalScheduleData ), external_partition_set_datas=check.list_param( external_partition_set_datas, "external_partition_set_datas", of_type=ExternalPartitionSetData, ), external_executable_datas=check.opt_list_param( external_executable_datas, "external_executable_datas" ), external_job_datas=check.opt_list_param( external_job_datas, "external_job_datas", of_type=ExternalJobData, ), ) def get_pipeline_snapshot(self, name): check.str_param(name, "name") for external_pipeline_data in self.external_pipeline_datas: if external_pipeline_data.name == name: return external_pipeline_data.pipeline_snapshot check.failed("Could not find pipeline snapshot named " + name) def get_external_pipeline_data(self, name): check.str_param(name, "name") for external_pipeline_data in self.external_pipeline_datas: if external_pipeline_data.name == name: return external_pipeline_data check.failed("Could not find external pipeline data named " + name) def get_external_schedule_data(self, name): check.str_param(name, "name") for external_schedule_data in self.external_schedule_datas: if external_schedule_data.name == name: return external_schedule_data check.failed("Could not find external schedule data named " + name) def get_external_partition_set_data(self, name): check.str_param(name, "name") for external_partition_set_data in self.external_partition_set_datas: if external_partition_set_data.name == name: return external_partition_set_data check.failed("Could not find external partition set data named " + name) def get_external_job_data(self, name): check.str_param(name, "name") for external_job_data in self.external_job_datas: if external_job_data.name == name: return external_job_data check.failed("Could not find job data named " + name) @whitelist_for_serdes class ExternalPipelineSubsetResult( namedtuple("_ExternalPipelineSubsetResult", "success error external_pipeline_data") ): def __new__(cls, success, error=None, external_pipeline_data=None): return super(ExternalPipelineSubsetResult, cls).__new__( cls, success=check.bool_param(success, "success"), error=check.opt_inst_param(error, "error", SerializableErrorInfo), external_pipeline_data=check.opt_inst_param( external_pipeline_data, "external_pipeline_data", ExternalPipelineData ), ) @whitelist_for_serdes class ExternalPipelineData( namedtuple( "_ExternalPipelineData", "name pipeline_snapshot active_presets parent_pipeline_snapshot" ) ): def __new__(cls, name, pipeline_snapshot, active_presets, parent_pipeline_snapshot): return super(ExternalPipelineData, cls).__new__( cls, name=check.str_param(name, "name"), pipeline_snapshot=check.inst_param( pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot ), parent_pipeline_snapshot=check.opt_inst_param( parent_pipeline_snapshot, "parent_pipeline_snapshot", PipelineSnapshot ), active_presets=check.list_param( active_presets, "active_presets", of_type=ExternalPresetData ), ) @whitelist_for_serdes class ExternalPresetData( namedtuple("_ExternalPresetData", "name run_config solid_selection mode tags") ): def __new__(cls, name, run_config, solid_selection, mode, tags): return super(ExternalPresetData, cls).__new__( cls, name=check.str_param(name, "name"), run_config=check.opt_dict_param(run_config, "run_config"), solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), mode=check.str_param(mode, "mode"), tags=check.opt_dict_param(tags, "tags"), ) @whitelist_for_serdes class ExternalScheduleData( namedtuple( "_ExternalScheduleData", "name cron_schedule pipeline_name solid_selection mode environment_vars partition_set_name execution_timezone", ) ): def __new__( cls, name, cron_schedule, pipeline_name, solid_selection, mode, environment_vars, partition_set_name, execution_timezone, ): return super(ExternalScheduleData, cls).__new__( cls, name=check.str_param(name, "name"), cron_schedule=check.str_param(cron_schedule, "cron_schedule"), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), environment_vars=check.opt_dict_param(environment_vars, "environment_vars"), partition_set_name=check.opt_str_param(partition_set_name, "partition_set_name"), execution_timezone=check.opt_str_param(execution_timezone, "execution_timezone"), ) @whitelist_for_serdes class ExternalScheduleExecutionData( namedtuple("_ExternalScheduleExecutionData", "run_config tags should_execute") ): def __new__(cls, run_config=None, tags=None, should_execute=None): return super(ExternalScheduleExecutionData, cls).__new__( cls, run_config=check.opt_dict_param(run_config, "run_config"), tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), should_execute=check.opt_bool_param(should_execute, "should_execute"), ) @whitelist_for_serdes class ExternalScheduleExecutionErrorData( namedtuple("_ExternalScheduleExecutionErrorData", "error") ): def __new__(cls, error): return super(ExternalScheduleExecutionErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class ExternalJobData( namedtuple("_ExternalJobData", "name job_type pipeline_name solid_selection mode") ): def __new__( cls, name, job_type, pipeline_name, solid_selection, mode, ): return super(ExternalJobData, cls).__new__( cls, name=check.str_param(name, "name"), job_type=check.inst_param(job_type, "job_type", JobType), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), ) +@whitelist_for_serdes +class ExternalSensorExecutionData( + namedtuple("_ExternalSensorExecutionData", "should_execute run_config tags") +): + def __new__(cls, should_execute=None, run_config=None, tags=None): + return super(ExternalSensorExecutionData, cls).__new__( + cls, + should_execute=check.opt_bool_param(should_execute, "should_execute"), + run_config=check.opt_dict_param(run_config, "run_config"), + tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), + ) + + +@whitelist_for_serdes +class ExternalSensorExecutionErrorData(namedtuple("_ExternalSensorExecutionErrorData", "error")): + def __new__(cls, error): + return super(ExternalSensorExecutionErrorData, cls).__new__( + cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), + ) + + @whitelist_for_serdes class ExternalExecutionParamsData(namedtuple("_ExternalExecutionParamsData", "run_config tags")): def __new__(cls, run_config=None, tags=None): return super(ExternalExecutionParamsData, cls).__new__( cls, run_config=check.opt_dict_param(run_config, "run_config"), tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), ) @whitelist_for_serdes class ExternalExecutionParamsErrorData(namedtuple("_ExternalExecutionParamsErrorData", "error")): def __new__(cls, error): return super(ExternalExecutionParamsErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class ExternalPartitionSetData( namedtuple("_ExternalPartitionSetData", "name pipeline_name solid_selection mode") ): def __new__(cls, name, pipeline_name, solid_selection, mode): return super(ExternalPartitionSetData, cls).__new__( cls, name=check.str_param(name, "name"), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), ) @whitelist_for_serdes class ExternalPartitionNamesData(namedtuple("_ExternalPartitionNamesData", "partition_names")): def __new__(cls, partition_names=None): return super(ExternalPartitionNamesData, cls).__new__( cls, partition_names=check.opt_list_param(partition_names, "partition_names", str), ) @whitelist_for_serdes class ExternalPartitionConfigData(namedtuple("_ExternalPartitionConfigData", "name run_config")): def __new__(cls, name, run_config=None): return super(ExternalPartitionConfigData, cls).__new__( cls, name=check.str_param(name, "name"), run_config=check.opt_dict_param(run_config, "run_config"), ) @whitelist_for_serdes class ExternalPartitionTagsData(namedtuple("_ExternalPartitionTagsData", "name tags")): def __new__(cls, name, tags=None): return super(ExternalPartitionTagsData, cls).__new__( cls, name=check.str_param(name, "name"), tags=check.opt_dict_param(tags, "tags"), ) @whitelist_for_serdes class ExternalPartitionExecutionParamData( namedtuple("_ExternalPartitionExecutionParamData", "name tags run_config") ): def __new__(cls, name, tags, run_config): return super(ExternalPartitionExecutionParamData, cls).__new__( cls, name=check.str_param(name, "name"), tags=check.dict_param(tags, "tags"), run_config=check.opt_dict_param(run_config, "run_config"), ) @whitelist_for_serdes class ExternalPartitionSetExecutionParamData( namedtuple("_ExternalPartitionSetExecutionParamData", "partition_data") ): def __new__(cls, partition_data): return super(ExternalPartitionSetExecutionParamData, cls).__new__( cls, partition_data=check.list_param( partition_data, "partition_data", of_type=ExternalPartitionExecutionParamData ), ) @whitelist_for_serdes class ExternalPartitionExecutionErrorData( namedtuple("_ExternalPartitionExecutionErrorData", "error") ): def __new__(cls, error): return super(ExternalPartitionExecutionErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) def external_repository_data_from_def(repository_def): check.inst_param(repository_def, "repository_def", RepositoryDefinition) return ExternalRepositoryData( name=repository_def.name, external_pipeline_datas=sorted( list(map(external_pipeline_data_from_def, repository_def.get_all_pipelines())), key=lambda pd: pd.name, ), external_schedule_datas=sorted( list(map(external_schedule_data_from_def, repository_def.schedule_defs)), key=lambda sd: sd.name, ), external_partition_set_datas=sorted( list(map(external_partition_set_data_from_def, repository_def.partition_set_defs)), key=lambda psd: psd.name, ), external_job_datas=sorted( list(map(external_job_from_def, repository_def.job_defs)), key=lambda ted: ted.name, ), ) def external_pipeline_data_from_def(pipeline_def): check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) return ExternalPipelineData( name=pipeline_def.name, pipeline_snapshot=pipeline_def.get_pipeline_snapshot(), parent_pipeline_snapshot=pipeline_def.get_parent_pipeline_snapshot(), active_presets=sorted( list(map(external_preset_data_from_def, pipeline_def.preset_defs)), key=lambda pd: pd.name, ), ) def external_schedule_data_from_def(schedule_def): check.inst_param(schedule_def, "schedule_def", ScheduleDefinition) return ExternalScheduleData( name=schedule_def.name, cron_schedule=schedule_def.cron_schedule, pipeline_name=schedule_def.pipeline_name, solid_selection=schedule_def.solid_selection, mode=schedule_def.mode, environment_vars=schedule_def.environment_vars, partition_set_name=schedule_def.get_partition_set().name if isinstance(schedule_def, PartitionScheduleDefinition) else None, execution_timezone=schedule_def.execution_timezone, ) def external_partition_set_data_from_def(partition_set_def): check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) return ExternalPartitionSetData( name=partition_set_def.name, pipeline_name=partition_set_def.pipeline_name, solid_selection=partition_set_def.solid_selection, mode=partition_set_def.mode, ) def external_job_from_def(job_def): check.inst_param(job_def, "job_def", JobDefinition) return ExternalJobData( name=job_def.name, job_type=job_def.job_type, pipeline_name=job_def.pipeline_name, solid_selection=job_def.solid_selection, mode=job_def.mode, ) def external_preset_data_from_def(preset_def): check.inst_param(preset_def, "preset_def", PresetDefinition) return ExternalPresetData( name=preset_def.name, run_config=preset_def.run_config, solid_selection=preset_def.solid_selection, mode=preset_def.mode, tags=preset_def.tags, ) diff --git a/python_modules/dagster/dagster/core/host_representation/repository_location.py b/python_modules/dagster/dagster/core/host_representation/repository_location.py index d101a9190..9f80a9517 100644 --- a/python_modules/dagster/dagster/core/host_representation/repository_location.py +++ b/python_modules/dagster/dagster/core/host_representation/repository_location.py @@ -1,423 +1,445 @@ import datetime from abc import ABCMeta, abstractmethod, abstractproperty import pendulum import six from dagster import check from dagster.api.snapshot_execution_plan import sync_get_external_execution_plan_grpc from dagster.api.snapshot_partition import ( sync_get_external_partition_config_grpc, sync_get_external_partition_names_grpc, sync_get_external_partition_set_execution_param_data_grpc, sync_get_external_partition_tags_grpc, ) from dagster.api.snapshot_pipeline import sync_get_external_pipeline_subset_grpc from dagster.api.snapshot_repository import sync_get_streaming_external_repositories_grpc from dagster.api.snapshot_schedule import sync_get_external_schedule_execution_data_grpc +from dagster.api.snapshot_sensor import sync_get_external_sensor_execution_data_grpc from dagster.core.execution.api import create_execution_plan from dagster.core.host_representation import ( ExternalExecutionPlan, ExternalPipeline, GrpcServerRepositoryLocationHandle, InProcessRepositoryLocationHandle, ManagedGrpcPythonEnvRepositoryLocationHandle, PipelineHandle, RepositoryHandle, RepositoryLocationHandle, ) from dagster.core.instance import DagsterInstance from dagster.core.snap.execution_plan_snapshot import ( ExecutionPlanSnapshotErrorData, snapshot_from_execution_plan, ) from dagster.grpc.impl import ( get_external_schedule_execution, + get_external_sensor_execution, get_partition_config, get_partition_names, get_partition_set_execution_param_data, get_partition_tags, ) from dagster.utils.hosted_user_process import external_repo_from_def from .selector import PipelineSelector class RepositoryLocation(six.with_metaclass(ABCMeta)): """ A RepositoryLocation represents a target containing user code which has a set of Dagster definition objects. A given location will contain some number of uniquely named RepositoryDefinitions, which therein contains Pipeline, Solid, and other definitions. Dagster tools are typically "host" processes, meaning they load a RepositoryLocation and communicate with it over an IPC/RPC layer. Currently this IPC layer is implemented by invoking the dagster CLI in a target python interpreter (e.g. a virtual environment) in either a) the current node b) a container In the near future, we may also make this communication channel able over an RPC layer, in which case the information needed to load a RepositoryLocation will be a url that abides by some RPC contract. We also allow for InProcessRepositoryLocation which actually loads the user-defined artifacts into process with the host tool. This is mostly for test scenarios. """ @abstractmethod def get_repository(self, name): pass @abstractmethod def has_repository(self, name): pass @abstractmethod def get_repositories(self): pass def get_repository_names(self): return list(self.get_repositories().keys()) @abstractproperty def name(self): pass @abstractproperty def location_handle(self): pass @abstractmethod def get_external_execution_plan( self, external_pipeline, run_config, mode, step_keys_to_execute ): pass @abstractmethod def get_subset_external_pipeline_result(self, selector): pass @abstractmethod def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): pass @abstractmethod def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): pass @abstractmethod def get_external_partition_names(self, repository_handle, partition_set_name): pass @abstractmethod def get_external_partition_set_execution_param_data( self, repository_handle, partition_set_name, partition_names ): pass @abstractmethod def get_external_schedule_execution_data( self, instance, repository_handle, schedule_name, scheduled_execution_time, ): pass + @abstractmethod + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + pass + @abstractproperty def is_reload_supported(self): pass @staticmethod def from_handle(repository_location_handle): check.inst_param( repository_location_handle, "repository_location_handle", RepositoryLocationHandle ) if isinstance(repository_location_handle, InProcessRepositoryLocationHandle): return InProcessRepositoryLocation(repository_location_handle) elif isinstance( repository_location_handle, GrpcServerRepositoryLocationHandle ) or isinstance(repository_location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle): return GrpcServerRepositoryLocation(repository_location_handle) else: check.failed("Unsupported handle: {}".format(repository_location_handle)) def create_reloaded_repository_location(self): return RepositoryLocation.from_handle(self.location_handle.create_reloaded_handle()) class InProcessRepositoryLocation(RepositoryLocation): def __init__(self, handle): self._handle = check.inst_param(handle, "handle", InProcessRepositoryLocationHandle,) self._recon_repo = self._handle.origin.recon_repo repo_def = self._recon_repo.get_definition() def_name = repo_def.name self._external_repo = external_repo_from_def( repo_def, RepositoryHandle(repository_name=def_name, repository_location_handle=self._handle), ) self._repositories = {self._external_repo.name: self._external_repo} @property def is_reload_supported(self): return False def get_reconstructable_pipeline(self, name): return self.get_reconstructable_repository().get_reconstructable_pipeline(name) def get_reconstructable_repository(self): return self._handle.origin.recon_repo @property def name(self): return self._handle.location_name @property def location_handle(self): return self._handle def get_repository(self, name): return self._repositories[name] def has_repository(self, name): return name in self._repositories def get_repositories(self): return self._repositories def get_subset_external_pipeline_result(self, selector): check.inst_param(selector, "selector", PipelineSelector) check.invariant( selector.location_name == self.name, "PipelineSelector location_name mismatch, got {selector.location_name} expected {self.name}".format( self=self, selector=selector ), ) from dagster.grpc.impl import get_external_pipeline_subset_result return get_external_pipeline_subset_result( self.get_reconstructable_pipeline(selector.pipeline_name), selector.solid_selection ) def get_external_execution_plan( self, external_pipeline, run_config, mode, step_keys_to_execute ): check.inst_param(external_pipeline, "external_pipeline", ExternalPipeline) check.dict_param(run_config, "run_config") check.str_param(mode, "mode") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) return ExternalExecutionPlan( execution_plan_snapshot=snapshot_from_execution_plan( create_execution_plan( pipeline=self.get_reconstructable_pipeline( external_pipeline.name ).subset_for_execution_from_existing_pipeline( external_pipeline.solids_to_execute ), run_config=run_config, mode=mode, step_keys_to_execute=step_keys_to_execute, ), external_pipeline.identifying_pipeline_snapshot_id, ), represented_pipeline=external_pipeline, ) def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return get_partition_config( recon_repo=self._recon_repo, partition_set_name=partition_set_name, partition_name=partition_name, ) def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return get_partition_tags( recon_repo=self._recon_repo, partition_set_name=partition_set_name, partition_name=partition_name, ) def get_external_partition_names(self, repository_handle, partition_set_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") return get_partition_names( recon_repo=self._recon_repo, partition_set_name=partition_set_name, ) def get_external_schedule_execution_data( self, instance, repository_handle, schedule_name, scheduled_execution_time, ): check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(schedule_name, "schedule_name") check.opt_inst_param( scheduled_execution_time, "scheduled_execution_time", pendulum.Pendulum ) return get_external_schedule_execution( self._recon_repo, instance_ref=instance.get_ref(), schedule_name=schedule_name, scheduled_execution_timestamp=scheduled_execution_time.timestamp() if scheduled_execution_time else None, scheduled_execution_timezone=scheduled_execution_time.timezone.name if scheduled_execution_time else None, ) + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + return get_external_sensor_execution( + self._recon_repo, instance.get_ref(), name, last_evaluation_time + ) + def get_external_partition_set_execution_param_data( self, repository_handle, partition_set_name, partition_names ): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) return get_partition_set_execution_param_data( self._recon_repo, partition_set_name=partition_set_name, partition_names=partition_names, ) class GrpcServerRepositoryLocation(RepositoryLocation): def __init__(self, repository_location_handle): check.param_invariant( isinstance(repository_location_handle, GrpcServerRepositoryLocationHandle) or isinstance(repository_location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle), "repository_location_handle", ) self._handle = repository_location_handle external_repositories_list = sync_get_streaming_external_repositories_grpc( self._handle.client, self._handle, ) self.external_repositories = {repo.name: repo for repo in external_repositories_list} @property def is_reload_supported(self): return True def get_repository(self, name): check.str_param(name, "name") return self.external_repositories[name] def has_repository(self, name): return name in self.external_repositories def get_repositories(self): return self.external_repositories @property def name(self): return self._handle.location_name @property def location_handle(self): return self._handle def get_external_execution_plan( self, external_pipeline, run_config, mode, step_keys_to_execute ): check.inst_param(external_pipeline, "external_pipeline", ExternalPipeline) check.dict_param(run_config, "run_config") check.str_param(mode, "mode") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) execution_plan_snapshot_or_error = sync_get_external_execution_plan_grpc( api_client=self._handle.client, pipeline_origin=external_pipeline.get_external_origin(), run_config=run_config, mode=mode, pipeline_snapshot_id=external_pipeline.identifying_pipeline_snapshot_id, solid_selection=external_pipeline.solid_selection, step_keys_to_execute=step_keys_to_execute, ) if isinstance(execution_plan_snapshot_or_error, ExecutionPlanSnapshotErrorData): return execution_plan_snapshot_or_error return ExternalExecutionPlan( execution_plan_snapshot=execution_plan_snapshot_or_error, represented_pipeline=external_pipeline, ) def get_subset_external_pipeline_result(self, selector): check.inst_param(selector, "selector", PipelineSelector) check.invariant( selector.location_name == self.name, "PipelineSelector location_name mismatch, got {selector.location_name} expected {self.name}".format( self=self, selector=selector ), ) external_repository = self.external_repositories[selector.repository_name] pipeline_handle = PipelineHandle(selector.pipeline_name, external_repository.handle) return sync_get_external_pipeline_subset_grpc( self._handle.client, pipeline_handle.get_external_origin(), selector.solid_selection ) def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_config_grpc( self._handle.client, repository_handle, partition_set_name, partition_name ) def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_tags_grpc( self._handle.client, repository_handle, partition_set_name, partition_name ) def get_external_partition_names(self, repository_handle, partition_set_name): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") return sync_get_external_partition_names_grpc( self._handle.client, repository_handle, partition_set_name ) def get_external_schedule_execution_data( self, instance, repository_handle, schedule_name, scheduled_execution_time, ): check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(schedule_name, "schedule_name") check.opt_inst_param( scheduled_execution_time, "scheduled_execution_time", datetime.datetime ) return sync_get_external_schedule_execution_data_grpc( self._handle.client, instance, repository_handle, schedule_name, scheduled_execution_time, ) + def get_external_sensor_execution_data( + self, instance, repository_handle, name, last_evaluation_time + ): + return sync_get_external_sensor_execution_data_grpc( + self._handle.client, instance, repository_handle, name, last_evaluation_time + ) + def get_external_partition_set_execution_param_data( self, repository_handle, partition_set_name, partition_names ): check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) return sync_get_external_partition_set_execution_param_data_grpc( self._handle.client, repository_handle, partition_set_name, partition_names ) diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py index 7c0244d91..fa8301de8 100644 --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py @@ -1,2019 +1,2135 @@ # @generated # This file was generated by running `python -m dagster.grpc.compile` # Do not edit this file directly, and do not attempt to recompile it using # grpc_tools.protoc directly, as several changes must be made to the raw output # pylint: disable=protected-access,no-name-in-module # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: api.proto """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( name="api.proto", package="api", syntax="proto3", serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"%\n\x10GetServerIdReply\x12\x11\n\tserver_id\x18\x01 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\xbb\x0c\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12\x32\n\x0bGetServerId\x12\n.api.Empty\x1a\x15.api.GetServerIdReply"\x00\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', + serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"%\n\x10GetServerIdReply\x12\x11\n\tserver_id\x18\x01 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"S\n\x1e\x45xternalSensorExecutionRequest\x12\x31\n)serialized_external_sensor_execution_args\x18\x01 \x01(\t"t\n\x1c\x45xternalSensorExecutionReply\x12T\nLserialized_external_sensor_execution_data_or_external_sensor_execution_error\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\xa0\r\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12\x32\n\x0bGetServerId\x12\n.api.Empty\x1a\x15.api.GetServerIdReply"\x00\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12\x63\n\x17\x45xternalSensorExecution\x12#.api.ExternalSensorExecutionRequest\x1a!.api.ExternalSensorExecutionReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', ) _EMPTY = _descriptor.Descriptor( name="Empty", full_name="api.Empty", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=18, serialized_end=25, ) _PINGREQUEST = _descriptor.Descriptor( name="PingRequest", full_name="api.PingRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="echo", full_name="api.PingRequest.echo", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=27, serialized_end=54, ) _PINGREPLY = _descriptor.Descriptor( name="PingReply", full_name="api.PingReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="echo", full_name="api.PingReply.echo", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=56, serialized_end=81, ) _STREAMINGPINGREQUEST = _descriptor.Descriptor( name="StreamingPingRequest", full_name="api.StreamingPingRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="sequence_length", full_name="api.StreamingPingRequest.sequence_length", index=0, number=1, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), _descriptor.FieldDescriptor( name="echo", full_name="api.StreamingPingRequest.echo", index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=83, serialized_end=144, ) _STREAMINGPINGEVENT = _descriptor.Descriptor( name="StreamingPingEvent", full_name="api.StreamingPingEvent", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="sequence_number", full_name="api.StreamingPingEvent.sequence_number", index=0, number=1, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), _descriptor.FieldDescriptor( name="echo", full_name="api.StreamingPingEvent.echo", index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=146, serialized_end=205, ) _GETSERVERIDREPLY = _descriptor.Descriptor( name="GetServerIdReply", full_name="api.GetServerIdReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="server_id", full_name="api.GetServerIdReply.server_id", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=207, serialized_end=244, ) _EXECUTIONPLANSNAPSHOTREQUEST = _descriptor.Descriptor( name="ExecutionPlanSnapshotRequest", full_name="api.ExecutionPlanSnapshotRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_execution_plan_snapshot_args", full_name="api.ExecutionPlanSnapshotRequest.serialized_execution_plan_snapshot_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=246, serialized_end=325, ) _EXECUTIONPLANSNAPSHOTREPLY = _descriptor.Descriptor( name="ExecutionPlanSnapshotReply", full_name="api.ExecutionPlanSnapshotReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_execution_plan_snapshot", full_name="api.ExecutionPlanSnapshotReply.serialized_execution_plan_snapshot", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=327, serialized_end=399, ) _EXTERNALPARTITIONNAMESREQUEST = _descriptor.Descriptor( name="ExternalPartitionNamesRequest", full_name="api.ExternalPartitionNamesRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_partition_names_args", full_name="api.ExternalPartitionNamesRequest.serialized_partition_names_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=401, serialized_end=473, ) _EXTERNALPARTITIONNAMESREPLY = _descriptor.Descriptor( name="ExternalPartitionNamesReply", full_name="api.ExternalPartitionNamesReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_partition_names_or_external_partition_execution_error", full_name="api.ExternalPartitionNamesReply.serialized_external_partition_names_or_external_partition_execution_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=475, serialized_end=587, ) _EXTERNALPARTITIONCONFIGREQUEST = _descriptor.Descriptor( name="ExternalPartitionConfigRequest", full_name="api.ExternalPartitionConfigRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_partition_args", full_name="api.ExternalPartitionConfigRequest.serialized_partition_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=589, serialized_end=656, ) _EXTERNALPARTITIONCONFIGREPLY = _descriptor.Descriptor( name="ExternalPartitionConfigReply", full_name="api.ExternalPartitionConfigReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_partition_config_or_external_partition_execution_error", full_name="api.ExternalPartitionConfigReply.serialized_external_partition_config_or_external_partition_execution_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=658, serialized_end=772, ) _EXTERNALPARTITIONTAGSREQUEST = _descriptor.Descriptor( name="ExternalPartitionTagsRequest", full_name="api.ExternalPartitionTagsRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_partition_args", full_name="api.ExternalPartitionTagsRequest.serialized_partition_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=774, serialized_end=839, ) _EXTERNALPARTITIONTAGSREPLY = _descriptor.Descriptor( name="ExternalPartitionTagsReply", full_name="api.ExternalPartitionTagsReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_partition_tags_or_external_partition_execution_error", full_name="api.ExternalPartitionTagsReply.serialized_external_partition_tags_or_external_partition_execution_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=841, serialized_end=951, ) _EXTERNALPARTITIONSETEXECUTIONPARAMSREQUEST = _descriptor.Descriptor( name="ExternalPartitionSetExecutionParamsRequest", full_name="api.ExternalPartitionSetExecutionParamsRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_partition_set_execution_param_args", full_name="api.ExternalPartitionSetExecutionParamsRequest.serialized_partition_set_execution_param_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=953, serialized_end=1052, ) _EXTERNALPARTITIONSETEXECUTIONPARAMSREPLY = _descriptor.Descriptor( name="ExternalPartitionSetExecutionParamsReply", full_name="api.ExternalPartitionSetExecutionParamsReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_partition_set_execution_param_data_or_external_partition_execution_error", full_name="api.ExternalPartitionSetExecutionParamsReply.serialized_external_partition_set_execution_param_data_or_external_partition_execution_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1055, serialized_end=1199, ) _LISTREPOSITORIESREQUEST = _descriptor.Descriptor( name="ListRepositoriesRequest", full_name="api.ListRepositoriesRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1201, serialized_end=1226, ) _LISTREPOSITORIESREPLY = _descriptor.Descriptor( name="ListRepositoriesReply", full_name="api.ListRepositoriesReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_list_repositories_response_or_error", full_name="api.ListRepositoriesReply.serialized_list_repositories_response_or_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1228, serialized_end=1307, ) _EXTERNALPIPELINESUBSETSNAPSHOTREQUEST = _descriptor.Descriptor( name="ExternalPipelineSubsetSnapshotRequest", full_name="api.ExternalPipelineSubsetSnapshotRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_pipeline_subset_snapshot_args", full_name="api.ExternalPipelineSubsetSnapshotRequest.serialized_pipeline_subset_snapshot_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1309, serialized_end=1398, ) _EXTERNALPIPELINESUBSETSNAPSHOTREPLY = _descriptor.Descriptor( name="ExternalPipelineSubsetSnapshotReply", full_name="api.ExternalPipelineSubsetSnapshotReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_pipeline_subset_result", full_name="api.ExternalPipelineSubsetSnapshotReply.serialized_external_pipeline_subset_result", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1400, serialized_end=1489, ) _EXTERNALREPOSITORYREQUEST = _descriptor.Descriptor( name="ExternalRepositoryRequest", full_name="api.ExternalRepositoryRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_repository_python_origin", full_name="api.ExternalRepositoryRequest.serialized_repository_python_origin", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1491, serialized_end=1563, ) _EXTERNALREPOSITORYREPLY = _descriptor.Descriptor( name="ExternalRepositoryReply", full_name="api.ExternalRepositoryReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_repository_data", full_name="api.ExternalRepositoryReply.serialized_external_repository_data", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1565, serialized_end=1635, ) _STREAMINGEXTERNALREPOSITORYEVENT = _descriptor.Descriptor( name="StreamingExternalRepositoryEvent", full_name="api.StreamingExternalRepositoryEvent", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="sequence_number", full_name="api.StreamingExternalRepositoryEvent.sequence_number", index=0, number=1, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), _descriptor.FieldDescriptor( name="serialized_external_repository_chunk", full_name="api.StreamingExternalRepositoryEvent.serialized_external_repository_chunk", index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1637, serialized_end=1742, ) _EXTERNALSCHEDULEEXECUTIONREQUEST = _descriptor.Descriptor( name="ExternalScheduleExecutionRequest", full_name="api.ExternalScheduleExecutionRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_schedule_execution_args", full_name="api.ExternalScheduleExecutionRequest.serialized_external_schedule_execution_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1744, serialized_end=1831, ) _EXTERNALSCHEDULEEXECUTIONREPLY = _descriptor.Descriptor( name="ExternalScheduleExecutionReply", full_name="api.ExternalScheduleExecutionReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_external_schedule_execution_data_or_external_schedule_execution_error", full_name="api.ExternalScheduleExecutionReply.serialized_external_schedule_execution_data_or_external_schedule_execution_error", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], serialized_start=1833, serialized_end=1955, ) +_EXTERNALSENSOREXECUTIONREQUEST = _descriptor.Descriptor( + name="ExternalSensorExecutionRequest", + full_name="api.ExternalSensorExecutionRequest", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="serialized_external_sensor_execution_args", + full_name="api.ExternalSensorExecutionRequest.serialized_external_sensor_execution_args", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=1957, + serialized_end=2040, +) + + +_EXTERNALSENSOREXECUTIONREPLY = _descriptor.Descriptor( + name="ExternalSensorExecutionReply", + full_name="api.ExternalSensorExecutionReply", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="serialized_external_sensor_execution_data_or_external_sensor_execution_error", + full_name="api.ExternalSensorExecutionReply.serialized_external_sensor_execution_data_or_external_sensor_execution_error", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=2042, + serialized_end=2158, +) + + _SHUTDOWNSERVERREPLY = _descriptor.Descriptor( name="ShutdownServerReply", full_name="api.ShutdownServerReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_shutdown_server_result", full_name="api.ShutdownServerReply.serialized_shutdown_server_result", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=1957, - serialized_end=2021, + serialized_start=2160, + serialized_end=2224, ) _CANCELEXECUTIONREQUEST = _descriptor.Descriptor( name="CancelExecutionRequest", full_name="api.CancelExecutionRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_cancel_execution_request", full_name="api.CancelExecutionRequest.serialized_cancel_execution_request", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2023, - serialized_end=2092, + serialized_start=2226, + serialized_end=2295, ) _CANCELEXECUTIONREPLY = _descriptor.Descriptor( name="CancelExecutionReply", full_name="api.CancelExecutionReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_cancel_execution_result", full_name="api.CancelExecutionReply.serialized_cancel_execution_result", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2094, - serialized_end=2160, + serialized_start=2297, + serialized_end=2363, ) _CANCANCELEXECUTIONREQUEST = _descriptor.Descriptor( name="CanCancelExecutionRequest", full_name="api.CanCancelExecutionRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_can_cancel_execution_request", full_name="api.CanCancelExecutionRequest.serialized_can_cancel_execution_request", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2162, - serialized_end=2238, + serialized_start=2365, + serialized_end=2441, ) _CANCANCELEXECUTIONREPLY = _descriptor.Descriptor( name="CanCancelExecutionReply", full_name="api.CanCancelExecutionReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_can_cancel_execution_result", full_name="api.CanCancelExecutionReply.serialized_can_cancel_execution_result", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2240, - serialized_end=2313, + serialized_start=2443, + serialized_end=2516, ) _STARTRUNREQUEST = _descriptor.Descriptor( name="StartRunRequest", full_name="api.StartRunRequest", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_execute_run_args", full_name="api.StartRunRequest.serialized_execute_run_args", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2315, - serialized_end=2369, + serialized_start=2518, + serialized_end=2572, ) _STARTRUNREPLY = _descriptor.Descriptor( name="StartRunReply", full_name="api.StartRunReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_start_run_result", full_name="api.StartRunReply.serialized_start_run_result", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2371, - serialized_end=2423, + serialized_start=2574, + serialized_end=2626, ) _GETCURRENTIMAGEREPLY = _descriptor.Descriptor( name="GetCurrentImageReply", full_name="api.GetCurrentImageReply", filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name="serialized_current_image", full_name="api.GetCurrentImageReply.serialized_current_image", index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode("utf-8"), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key, ), ], extensions=[], nested_types=[], enum_types=[], serialized_options=None, is_extendable=False, syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2425, - serialized_end=2481, + serialized_start=2628, + serialized_end=2684, ) DESCRIPTOR.message_types_by_name["Empty"] = _EMPTY DESCRIPTOR.message_types_by_name["PingRequest"] = _PINGREQUEST DESCRIPTOR.message_types_by_name["PingReply"] = _PINGREPLY DESCRIPTOR.message_types_by_name["StreamingPingRequest"] = _STREAMINGPINGREQUEST DESCRIPTOR.message_types_by_name["StreamingPingEvent"] = _STREAMINGPINGEVENT DESCRIPTOR.message_types_by_name["GetServerIdReply"] = _GETSERVERIDREPLY DESCRIPTOR.message_types_by_name["ExecutionPlanSnapshotRequest"] = _EXECUTIONPLANSNAPSHOTREQUEST DESCRIPTOR.message_types_by_name["ExecutionPlanSnapshotReply"] = _EXECUTIONPLANSNAPSHOTREPLY DESCRIPTOR.message_types_by_name["ExternalPartitionNamesRequest"] = _EXTERNALPARTITIONNAMESREQUEST DESCRIPTOR.message_types_by_name["ExternalPartitionNamesReply"] = _EXTERNALPARTITIONNAMESREPLY DESCRIPTOR.message_types_by_name["ExternalPartitionConfigRequest"] = _EXTERNALPARTITIONCONFIGREQUEST DESCRIPTOR.message_types_by_name["ExternalPartitionConfigReply"] = _EXTERNALPARTITIONCONFIGREPLY DESCRIPTOR.message_types_by_name["ExternalPartitionTagsRequest"] = _EXTERNALPARTITIONTAGSREQUEST DESCRIPTOR.message_types_by_name["ExternalPartitionTagsReply"] = _EXTERNALPARTITIONTAGSREPLY DESCRIPTOR.message_types_by_name[ "ExternalPartitionSetExecutionParamsRequest" ] = _EXTERNALPARTITIONSETEXECUTIONPARAMSREQUEST DESCRIPTOR.message_types_by_name[ "ExternalPartitionSetExecutionParamsReply" ] = _EXTERNALPARTITIONSETEXECUTIONPARAMSREPLY DESCRIPTOR.message_types_by_name["ListRepositoriesRequest"] = _LISTREPOSITORIESREQUEST DESCRIPTOR.message_types_by_name["ListRepositoriesReply"] = _LISTREPOSITORIESREPLY DESCRIPTOR.message_types_by_name[ "ExternalPipelineSubsetSnapshotRequest" ] = _EXTERNALPIPELINESUBSETSNAPSHOTREQUEST DESCRIPTOR.message_types_by_name[ "ExternalPipelineSubsetSnapshotReply" ] = _EXTERNALPIPELINESUBSETSNAPSHOTREPLY DESCRIPTOR.message_types_by_name["ExternalRepositoryRequest"] = _EXTERNALREPOSITORYREQUEST DESCRIPTOR.message_types_by_name["ExternalRepositoryReply"] = _EXTERNALREPOSITORYREPLY DESCRIPTOR.message_types_by_name[ "StreamingExternalRepositoryEvent" ] = _STREAMINGEXTERNALREPOSITORYEVENT DESCRIPTOR.message_types_by_name[ "ExternalScheduleExecutionRequest" ] = _EXTERNALSCHEDULEEXECUTIONREQUEST DESCRIPTOR.message_types_by_name["ExternalScheduleExecutionReply"] = _EXTERNALSCHEDULEEXECUTIONREPLY +DESCRIPTOR.message_types_by_name["ExternalSensorExecutionRequest"] = _EXTERNALSENSOREXECUTIONREQUEST +DESCRIPTOR.message_types_by_name["ExternalSensorExecutionReply"] = _EXTERNALSENSOREXECUTIONREPLY DESCRIPTOR.message_types_by_name["ShutdownServerReply"] = _SHUTDOWNSERVERREPLY DESCRIPTOR.message_types_by_name["CancelExecutionRequest"] = _CANCELEXECUTIONREQUEST DESCRIPTOR.message_types_by_name["CancelExecutionReply"] = _CANCELEXECUTIONREPLY DESCRIPTOR.message_types_by_name["CanCancelExecutionRequest"] = _CANCANCELEXECUTIONREQUEST DESCRIPTOR.message_types_by_name["CanCancelExecutionReply"] = _CANCANCELEXECUTIONREPLY DESCRIPTOR.message_types_by_name["StartRunRequest"] = _STARTRUNREQUEST DESCRIPTOR.message_types_by_name["StartRunReply"] = _STARTRUNREPLY DESCRIPTOR.message_types_by_name["GetCurrentImageReply"] = _GETCURRENTIMAGEREPLY _sym_db.RegisterFileDescriptor(DESCRIPTOR) Empty = _reflection.GeneratedProtocolMessageType( "Empty", (_message.Message,), { "DESCRIPTOR": _EMPTY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.Empty) }, ) _sym_db.RegisterMessage(Empty) PingRequest = _reflection.GeneratedProtocolMessageType( "PingRequest", (_message.Message,), { "DESCRIPTOR": _PINGREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.PingRequest) }, ) _sym_db.RegisterMessage(PingRequest) PingReply = _reflection.GeneratedProtocolMessageType( "PingReply", (_message.Message,), { "DESCRIPTOR": _PINGREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.PingReply) }, ) _sym_db.RegisterMessage(PingReply) StreamingPingRequest = _reflection.GeneratedProtocolMessageType( "StreamingPingRequest", (_message.Message,), { "DESCRIPTOR": _STREAMINGPINGREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.StreamingPingRequest) }, ) _sym_db.RegisterMessage(StreamingPingRequest) StreamingPingEvent = _reflection.GeneratedProtocolMessageType( "StreamingPingEvent", (_message.Message,), { "DESCRIPTOR": _STREAMINGPINGEVENT, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.StreamingPingEvent) }, ) _sym_db.RegisterMessage(StreamingPingEvent) GetServerIdReply = _reflection.GeneratedProtocolMessageType( "GetServerIdReply", (_message.Message,), { "DESCRIPTOR": _GETSERVERIDREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.GetServerIdReply) }, ) _sym_db.RegisterMessage(GetServerIdReply) ExecutionPlanSnapshotRequest = _reflection.GeneratedProtocolMessageType( "ExecutionPlanSnapshotRequest", (_message.Message,), { "DESCRIPTOR": _EXECUTIONPLANSNAPSHOTREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExecutionPlanSnapshotRequest) }, ) _sym_db.RegisterMessage(ExecutionPlanSnapshotRequest) ExecutionPlanSnapshotReply = _reflection.GeneratedProtocolMessageType( "ExecutionPlanSnapshotReply", (_message.Message,), { "DESCRIPTOR": _EXECUTIONPLANSNAPSHOTREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExecutionPlanSnapshotReply) }, ) _sym_db.RegisterMessage(ExecutionPlanSnapshotReply) ExternalPartitionNamesRequest = _reflection.GeneratedProtocolMessageType( "ExternalPartitionNamesRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONNAMESREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionNamesRequest) }, ) _sym_db.RegisterMessage(ExternalPartitionNamesRequest) ExternalPartitionNamesReply = _reflection.GeneratedProtocolMessageType( "ExternalPartitionNamesReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONNAMESREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionNamesReply) }, ) _sym_db.RegisterMessage(ExternalPartitionNamesReply) ExternalPartitionConfigRequest = _reflection.GeneratedProtocolMessageType( "ExternalPartitionConfigRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONCONFIGREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionConfigRequest) }, ) _sym_db.RegisterMessage(ExternalPartitionConfigRequest) ExternalPartitionConfigReply = _reflection.GeneratedProtocolMessageType( "ExternalPartitionConfigReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONCONFIGREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionConfigReply) }, ) _sym_db.RegisterMessage(ExternalPartitionConfigReply) ExternalPartitionTagsRequest = _reflection.GeneratedProtocolMessageType( "ExternalPartitionTagsRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONTAGSREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionTagsRequest) }, ) _sym_db.RegisterMessage(ExternalPartitionTagsRequest) ExternalPartitionTagsReply = _reflection.GeneratedProtocolMessageType( "ExternalPartitionTagsReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONTAGSREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionTagsReply) }, ) _sym_db.RegisterMessage(ExternalPartitionTagsReply) ExternalPartitionSetExecutionParamsRequest = _reflection.GeneratedProtocolMessageType( "ExternalPartitionSetExecutionParamsRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONSETEXECUTIONPARAMSREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionSetExecutionParamsRequest) }, ) _sym_db.RegisterMessage(ExternalPartitionSetExecutionParamsRequest) ExternalPartitionSetExecutionParamsReply = _reflection.GeneratedProtocolMessageType( "ExternalPartitionSetExecutionParamsReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALPARTITIONSETEXECUTIONPARAMSREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPartitionSetExecutionParamsReply) }, ) _sym_db.RegisterMessage(ExternalPartitionSetExecutionParamsReply) ListRepositoriesRequest = _reflection.GeneratedProtocolMessageType( "ListRepositoriesRequest", (_message.Message,), { "DESCRIPTOR": _LISTREPOSITORIESREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ListRepositoriesRequest) }, ) _sym_db.RegisterMessage(ListRepositoriesRequest) ListRepositoriesReply = _reflection.GeneratedProtocolMessageType( "ListRepositoriesReply", (_message.Message,), { "DESCRIPTOR": _LISTREPOSITORIESREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ListRepositoriesReply) }, ) _sym_db.RegisterMessage(ListRepositoriesReply) ExternalPipelineSubsetSnapshotRequest = _reflection.GeneratedProtocolMessageType( "ExternalPipelineSubsetSnapshotRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALPIPELINESUBSETSNAPSHOTREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPipelineSubsetSnapshotRequest) }, ) _sym_db.RegisterMessage(ExternalPipelineSubsetSnapshotRequest) ExternalPipelineSubsetSnapshotReply = _reflection.GeneratedProtocolMessageType( "ExternalPipelineSubsetSnapshotReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALPIPELINESUBSETSNAPSHOTREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalPipelineSubsetSnapshotReply) }, ) _sym_db.RegisterMessage(ExternalPipelineSubsetSnapshotReply) ExternalRepositoryRequest = _reflection.GeneratedProtocolMessageType( "ExternalRepositoryRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALREPOSITORYREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalRepositoryRequest) }, ) _sym_db.RegisterMessage(ExternalRepositoryRequest) ExternalRepositoryReply = _reflection.GeneratedProtocolMessageType( "ExternalRepositoryReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALREPOSITORYREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalRepositoryReply) }, ) _sym_db.RegisterMessage(ExternalRepositoryReply) StreamingExternalRepositoryEvent = _reflection.GeneratedProtocolMessageType( "StreamingExternalRepositoryEvent", (_message.Message,), { "DESCRIPTOR": _STREAMINGEXTERNALREPOSITORYEVENT, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.StreamingExternalRepositoryEvent) }, ) _sym_db.RegisterMessage(StreamingExternalRepositoryEvent) ExternalScheduleExecutionRequest = _reflection.GeneratedProtocolMessageType( "ExternalScheduleExecutionRequest", (_message.Message,), { "DESCRIPTOR": _EXTERNALSCHEDULEEXECUTIONREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalScheduleExecutionRequest) }, ) _sym_db.RegisterMessage(ExternalScheduleExecutionRequest) ExternalScheduleExecutionReply = _reflection.GeneratedProtocolMessageType( "ExternalScheduleExecutionReply", (_message.Message,), { "DESCRIPTOR": _EXTERNALSCHEDULEEXECUTIONREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ExternalScheduleExecutionReply) }, ) _sym_db.RegisterMessage(ExternalScheduleExecutionReply) +ExternalSensorExecutionRequest = _reflection.GeneratedProtocolMessageType( + "ExternalSensorExecutionRequest", + (_message.Message,), + { + "DESCRIPTOR": _EXTERNALSENSOREXECUTIONREQUEST, + "__module__": "api_pb2" + # @@protoc_insertion_point(class_scope:api.ExternalSensorExecutionRequest) + }, +) +_sym_db.RegisterMessage(ExternalSensorExecutionRequest) + +ExternalSensorExecutionReply = _reflection.GeneratedProtocolMessageType( + "ExternalSensorExecutionReply", + (_message.Message,), + { + "DESCRIPTOR": _EXTERNALSENSOREXECUTIONREPLY, + "__module__": "api_pb2" + # @@protoc_insertion_point(class_scope:api.ExternalSensorExecutionReply) + }, +) +_sym_db.RegisterMessage(ExternalSensorExecutionReply) + ShutdownServerReply = _reflection.GeneratedProtocolMessageType( "ShutdownServerReply", (_message.Message,), { "DESCRIPTOR": _SHUTDOWNSERVERREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.ShutdownServerReply) }, ) _sym_db.RegisterMessage(ShutdownServerReply) CancelExecutionRequest = _reflection.GeneratedProtocolMessageType( "CancelExecutionRequest", (_message.Message,), { "DESCRIPTOR": _CANCELEXECUTIONREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.CancelExecutionRequest) }, ) _sym_db.RegisterMessage(CancelExecutionRequest) CancelExecutionReply = _reflection.GeneratedProtocolMessageType( "CancelExecutionReply", (_message.Message,), { "DESCRIPTOR": _CANCELEXECUTIONREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.CancelExecutionReply) }, ) _sym_db.RegisterMessage(CancelExecutionReply) CanCancelExecutionRequest = _reflection.GeneratedProtocolMessageType( "CanCancelExecutionRequest", (_message.Message,), { "DESCRIPTOR": _CANCANCELEXECUTIONREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.CanCancelExecutionRequest) }, ) _sym_db.RegisterMessage(CanCancelExecutionRequest) CanCancelExecutionReply = _reflection.GeneratedProtocolMessageType( "CanCancelExecutionReply", (_message.Message,), { "DESCRIPTOR": _CANCANCELEXECUTIONREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.CanCancelExecutionReply) }, ) _sym_db.RegisterMessage(CanCancelExecutionReply) StartRunRequest = _reflection.GeneratedProtocolMessageType( "StartRunRequest", (_message.Message,), { "DESCRIPTOR": _STARTRUNREQUEST, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.StartRunRequest) }, ) _sym_db.RegisterMessage(StartRunRequest) StartRunReply = _reflection.GeneratedProtocolMessageType( "StartRunReply", (_message.Message,), { "DESCRIPTOR": _STARTRUNREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.StartRunReply) }, ) _sym_db.RegisterMessage(StartRunReply) GetCurrentImageReply = _reflection.GeneratedProtocolMessageType( "GetCurrentImageReply", (_message.Message,), { "DESCRIPTOR": _GETCURRENTIMAGEREPLY, "__module__": "api_pb2" # @@protoc_insertion_point(class_scope:api.GetCurrentImageReply) }, ) _sym_db.RegisterMessage(GetCurrentImageReply) _DAGSTERAPI = _descriptor.ServiceDescriptor( name="DagsterApi", full_name="api.DagsterApi", file=DESCRIPTOR, index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=2484, - serialized_end=4079, + serialized_start=2687, + serialized_end=4383, methods=[ _descriptor.MethodDescriptor( name="Ping", full_name="api.DagsterApi.Ping", index=0, containing_service=None, input_type=_PINGREQUEST, output_type=_PINGREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="Heartbeat", full_name="api.DagsterApi.Heartbeat", index=1, containing_service=None, input_type=_PINGREQUEST, output_type=_PINGREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="StreamingPing", full_name="api.DagsterApi.StreamingPing", index=2, containing_service=None, input_type=_STREAMINGPINGREQUEST, output_type=_STREAMINGPINGEVENT, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="GetServerId", full_name="api.DagsterApi.GetServerId", index=3, containing_service=None, input_type=_EMPTY, output_type=_GETSERVERIDREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExecutionPlanSnapshot", full_name="api.DagsterApi.ExecutionPlanSnapshot", index=4, containing_service=None, input_type=_EXECUTIONPLANSNAPSHOTREQUEST, output_type=_EXECUTIONPLANSNAPSHOTREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ListRepositories", full_name="api.DagsterApi.ListRepositories", index=5, containing_service=None, input_type=_LISTREPOSITORIESREQUEST, output_type=_LISTREPOSITORIESREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalPartitionNames", full_name="api.DagsterApi.ExternalPartitionNames", index=6, containing_service=None, input_type=_EXTERNALPARTITIONNAMESREQUEST, output_type=_EXTERNALPARTITIONNAMESREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalPartitionConfig", full_name="api.DagsterApi.ExternalPartitionConfig", index=7, containing_service=None, input_type=_EXTERNALPARTITIONCONFIGREQUEST, output_type=_EXTERNALPARTITIONCONFIGREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalPartitionTags", full_name="api.DagsterApi.ExternalPartitionTags", index=8, containing_service=None, input_type=_EXTERNALPARTITIONTAGSREQUEST, output_type=_EXTERNALPARTITIONTAGSREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalPartitionSetExecutionParams", full_name="api.DagsterApi.ExternalPartitionSetExecutionParams", index=9, containing_service=None, input_type=_EXTERNALPARTITIONSETEXECUTIONPARAMSREQUEST, output_type=_EXTERNALPARTITIONSETEXECUTIONPARAMSREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalPipelineSubsetSnapshot", full_name="api.DagsterApi.ExternalPipelineSubsetSnapshot", index=10, containing_service=None, input_type=_EXTERNALPIPELINESUBSETSNAPSHOTREQUEST, output_type=_EXTERNALPIPELINESUBSETSNAPSHOTREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalRepository", full_name="api.DagsterApi.ExternalRepository", index=11, containing_service=None, input_type=_EXTERNALREPOSITORYREQUEST, output_type=_EXTERNALREPOSITORYREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="StreamingExternalRepository", full_name="api.DagsterApi.StreamingExternalRepository", index=12, containing_service=None, input_type=_EXTERNALREPOSITORYREQUEST, output_type=_STREAMINGEXTERNALREPOSITORYEVENT, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="ExternalScheduleExecution", full_name="api.DagsterApi.ExternalScheduleExecution", index=13, containing_service=None, input_type=_EXTERNALSCHEDULEEXECUTIONREQUEST, output_type=_EXTERNALSCHEDULEEXECUTIONREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), + _descriptor.MethodDescriptor( + name="ExternalSensorExecution", + full_name="api.DagsterApi.ExternalSensorExecution", + index=14, + containing_service=None, + input_type=_EXTERNALSENSOREXECUTIONREQUEST, + output_type=_EXTERNALSENSOREXECUTIONREPLY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), _descriptor.MethodDescriptor( name="ShutdownServer", full_name="api.DagsterApi.ShutdownServer", - index=14, + index=15, containing_service=None, input_type=_EMPTY, output_type=_SHUTDOWNSERVERREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="CancelExecution", full_name="api.DagsterApi.CancelExecution", - index=15, + index=16, containing_service=None, input_type=_CANCELEXECUTIONREQUEST, output_type=_CANCELEXECUTIONREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="CanCancelExecution", full_name="api.DagsterApi.CanCancelExecution", - index=16, + index=17, containing_service=None, input_type=_CANCANCELEXECUTIONREQUEST, output_type=_CANCANCELEXECUTIONREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="StartRun", full_name="api.DagsterApi.StartRun", - index=17, + index=18, containing_service=None, input_type=_STARTRUNREQUEST, output_type=_STARTRUNREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name="GetCurrentImage", full_name="api.DagsterApi.GetCurrentImage", - index=18, + index=19, containing_service=None, input_type=_EMPTY, output_type=_GETCURRENTIMAGEREPLY, serialized_options=None, create_key=_descriptor._internal_create_key, ), ], ) _sym_db.RegisterServiceDescriptor(_DAGSTERAPI) DESCRIPTOR.services_by_name["DagsterApi"] = _DAGSTERAPI # @@protoc_insertion_point(module_scope) diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py index 17b62d0a2..ce22eaef0 100644 --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py @@ -1,895 +1,940 @@ # @generated # This file was generated by running `python -m dagster.grpc.compile` # Do not edit this file directly, and do not attempt to recompile it using # grpc_tools.protoc directly, as several changes must be made to the raw output # pylint: disable=no-member, unused-argument # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc from . import api_pb2 as api__pb2 class DagsterApiStub(object): """Missing associated documentation comment in .proto file.""" def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.Ping = channel.unary_unary( "/api.DagsterApi/Ping", request_serializer=api__pb2.PingRequest.SerializeToString, response_deserializer=api__pb2.PingReply.FromString, ) self.Heartbeat = channel.unary_unary( "/api.DagsterApi/Heartbeat", request_serializer=api__pb2.PingRequest.SerializeToString, response_deserializer=api__pb2.PingReply.FromString, ) self.StreamingPing = channel.unary_stream( "/api.DagsterApi/StreamingPing", request_serializer=api__pb2.StreamingPingRequest.SerializeToString, response_deserializer=api__pb2.StreamingPingEvent.FromString, ) self.GetServerId = channel.unary_unary( "/api.DagsterApi/GetServerId", request_serializer=api__pb2.Empty.SerializeToString, response_deserializer=api__pb2.GetServerIdReply.FromString, ) self.ExecutionPlanSnapshot = channel.unary_unary( "/api.DagsterApi/ExecutionPlanSnapshot", request_serializer=api__pb2.ExecutionPlanSnapshotRequest.SerializeToString, response_deserializer=api__pb2.ExecutionPlanSnapshotReply.FromString, ) self.ListRepositories = channel.unary_unary( "/api.DagsterApi/ListRepositories", request_serializer=api__pb2.ListRepositoriesRequest.SerializeToString, response_deserializer=api__pb2.ListRepositoriesReply.FromString, ) self.ExternalPartitionNames = channel.unary_unary( "/api.DagsterApi/ExternalPartitionNames", request_serializer=api__pb2.ExternalPartitionNamesRequest.SerializeToString, response_deserializer=api__pb2.ExternalPartitionNamesReply.FromString, ) self.ExternalPartitionConfig = channel.unary_unary( "/api.DagsterApi/ExternalPartitionConfig", request_serializer=api__pb2.ExternalPartitionConfigRequest.SerializeToString, response_deserializer=api__pb2.ExternalPartitionConfigReply.FromString, ) self.ExternalPartitionTags = channel.unary_unary( "/api.DagsterApi/ExternalPartitionTags", request_serializer=api__pb2.ExternalPartitionTagsRequest.SerializeToString, response_deserializer=api__pb2.ExternalPartitionTagsReply.FromString, ) self.ExternalPartitionSetExecutionParams = channel.unary_unary( "/api.DagsterApi/ExternalPartitionSetExecutionParams", request_serializer=api__pb2.ExternalPartitionSetExecutionParamsRequest.SerializeToString, response_deserializer=api__pb2.ExternalPartitionSetExecutionParamsReply.FromString, ) self.ExternalPipelineSubsetSnapshot = channel.unary_unary( "/api.DagsterApi/ExternalPipelineSubsetSnapshot", request_serializer=api__pb2.ExternalPipelineSubsetSnapshotRequest.SerializeToString, response_deserializer=api__pb2.ExternalPipelineSubsetSnapshotReply.FromString, ) self.ExternalRepository = channel.unary_unary( "/api.DagsterApi/ExternalRepository", request_serializer=api__pb2.ExternalRepositoryRequest.SerializeToString, response_deserializer=api__pb2.ExternalRepositoryReply.FromString, ) self.StreamingExternalRepository = channel.unary_stream( "/api.DagsterApi/StreamingExternalRepository", request_serializer=api__pb2.ExternalRepositoryRequest.SerializeToString, response_deserializer=api__pb2.StreamingExternalRepositoryEvent.FromString, ) self.ExternalScheduleExecution = channel.unary_unary( "/api.DagsterApi/ExternalScheduleExecution", request_serializer=api__pb2.ExternalScheduleExecutionRequest.SerializeToString, response_deserializer=api__pb2.ExternalScheduleExecutionReply.FromString, ) + self.ExternalSensorExecution = channel.unary_unary( + "/api.DagsterApi/ExternalSensorExecution", + request_serializer=api__pb2.ExternalSensorExecutionRequest.SerializeToString, + response_deserializer=api__pb2.ExternalSensorExecutionReply.FromString, + ) self.ShutdownServer = channel.unary_unary( "/api.DagsterApi/ShutdownServer", request_serializer=api__pb2.Empty.SerializeToString, response_deserializer=api__pb2.ShutdownServerReply.FromString, ) self.CancelExecution = channel.unary_unary( "/api.DagsterApi/CancelExecution", request_serializer=api__pb2.CancelExecutionRequest.SerializeToString, response_deserializer=api__pb2.CancelExecutionReply.FromString, ) self.CanCancelExecution = channel.unary_unary( "/api.DagsterApi/CanCancelExecution", request_serializer=api__pb2.CanCancelExecutionRequest.SerializeToString, response_deserializer=api__pb2.CanCancelExecutionReply.FromString, ) self.StartRun = channel.unary_unary( "/api.DagsterApi/StartRun", request_serializer=api__pb2.StartRunRequest.SerializeToString, response_deserializer=api__pb2.StartRunReply.FromString, ) self.GetCurrentImage = channel.unary_unary( "/api.DagsterApi/GetCurrentImage", request_serializer=api__pb2.Empty.SerializeToString, response_deserializer=api__pb2.GetCurrentImageReply.FromString, ) class DagsterApiServicer(object): """Missing associated documentation comment in .proto file.""" def Ping(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def Heartbeat(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def StreamingPing(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def GetServerId(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExecutionPlanSnapshot(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ListRepositories(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalPartitionNames(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalPartitionConfig(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalPartitionTags(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalPartitionSetExecutionParams(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalPipelineSubsetSnapshot(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalRepository(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def StreamingExternalRepository(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def ExternalScheduleExecution(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def ExternalSensorExecution(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def ShutdownServer(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def CancelExecution(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def CanCancelExecution(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def StartRun(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def GetCurrentImage(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") def add_DagsterApiServicer_to_server(servicer, server): rpc_method_handlers = { "Ping": grpc.unary_unary_rpc_method_handler( servicer.Ping, request_deserializer=api__pb2.PingRequest.FromString, response_serializer=api__pb2.PingReply.SerializeToString, ), "Heartbeat": grpc.unary_unary_rpc_method_handler( servicer.Heartbeat, request_deserializer=api__pb2.PingRequest.FromString, response_serializer=api__pb2.PingReply.SerializeToString, ), "StreamingPing": grpc.unary_stream_rpc_method_handler( servicer.StreamingPing, request_deserializer=api__pb2.StreamingPingRequest.FromString, response_serializer=api__pb2.StreamingPingEvent.SerializeToString, ), "GetServerId": grpc.unary_unary_rpc_method_handler( servicer.GetServerId, request_deserializer=api__pb2.Empty.FromString, response_serializer=api__pb2.GetServerIdReply.SerializeToString, ), "ExecutionPlanSnapshot": grpc.unary_unary_rpc_method_handler( servicer.ExecutionPlanSnapshot, request_deserializer=api__pb2.ExecutionPlanSnapshotRequest.FromString, response_serializer=api__pb2.ExecutionPlanSnapshotReply.SerializeToString, ), "ListRepositories": grpc.unary_unary_rpc_method_handler( servicer.ListRepositories, request_deserializer=api__pb2.ListRepositoriesRequest.FromString, response_serializer=api__pb2.ListRepositoriesReply.SerializeToString, ), "ExternalPartitionNames": grpc.unary_unary_rpc_method_handler( servicer.ExternalPartitionNames, request_deserializer=api__pb2.ExternalPartitionNamesRequest.FromString, response_serializer=api__pb2.ExternalPartitionNamesReply.SerializeToString, ), "ExternalPartitionConfig": grpc.unary_unary_rpc_method_handler( servicer.ExternalPartitionConfig, request_deserializer=api__pb2.ExternalPartitionConfigRequest.FromString, response_serializer=api__pb2.ExternalPartitionConfigReply.SerializeToString, ), "ExternalPartitionTags": grpc.unary_unary_rpc_method_handler( servicer.ExternalPartitionTags, request_deserializer=api__pb2.ExternalPartitionTagsRequest.FromString, response_serializer=api__pb2.ExternalPartitionTagsReply.SerializeToString, ), "ExternalPartitionSetExecutionParams": grpc.unary_unary_rpc_method_handler( servicer.ExternalPartitionSetExecutionParams, request_deserializer=api__pb2.ExternalPartitionSetExecutionParamsRequest.FromString, response_serializer=api__pb2.ExternalPartitionSetExecutionParamsReply.SerializeToString, ), "ExternalPipelineSubsetSnapshot": grpc.unary_unary_rpc_method_handler( servicer.ExternalPipelineSubsetSnapshot, request_deserializer=api__pb2.ExternalPipelineSubsetSnapshotRequest.FromString, response_serializer=api__pb2.ExternalPipelineSubsetSnapshotReply.SerializeToString, ), "ExternalRepository": grpc.unary_unary_rpc_method_handler( servicer.ExternalRepository, request_deserializer=api__pb2.ExternalRepositoryRequest.FromString, response_serializer=api__pb2.ExternalRepositoryReply.SerializeToString, ), "StreamingExternalRepository": grpc.unary_stream_rpc_method_handler( servicer.StreamingExternalRepository, request_deserializer=api__pb2.ExternalRepositoryRequest.FromString, response_serializer=api__pb2.StreamingExternalRepositoryEvent.SerializeToString, ), "ExternalScheduleExecution": grpc.unary_unary_rpc_method_handler( servicer.ExternalScheduleExecution, request_deserializer=api__pb2.ExternalScheduleExecutionRequest.FromString, response_serializer=api__pb2.ExternalScheduleExecutionReply.SerializeToString, ), + "ExternalSensorExecution": grpc.unary_unary_rpc_method_handler( + servicer.ExternalSensorExecution, + request_deserializer=api__pb2.ExternalSensorExecutionRequest.FromString, + response_serializer=api__pb2.ExternalSensorExecutionReply.SerializeToString, + ), "ShutdownServer": grpc.unary_unary_rpc_method_handler( servicer.ShutdownServer, request_deserializer=api__pb2.Empty.FromString, response_serializer=api__pb2.ShutdownServerReply.SerializeToString, ), "CancelExecution": grpc.unary_unary_rpc_method_handler( servicer.CancelExecution, request_deserializer=api__pb2.CancelExecutionRequest.FromString, response_serializer=api__pb2.CancelExecutionReply.SerializeToString, ), "CanCancelExecution": grpc.unary_unary_rpc_method_handler( servicer.CanCancelExecution, request_deserializer=api__pb2.CanCancelExecutionRequest.FromString, response_serializer=api__pb2.CanCancelExecutionReply.SerializeToString, ), "StartRun": grpc.unary_unary_rpc_method_handler( servicer.StartRun, request_deserializer=api__pb2.StartRunRequest.FromString, response_serializer=api__pb2.StartRunReply.SerializeToString, ), "GetCurrentImage": grpc.unary_unary_rpc_method_handler( servicer.GetCurrentImage, request_deserializer=api__pb2.Empty.FromString, response_serializer=api__pb2.GetCurrentImageReply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler("api.DagsterApi", rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) # This class is part of an EXPERIMENTAL API. class DagsterApi(object): """Missing associated documentation comment in .proto file.""" @staticmethod def Ping( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/Ping", api__pb2.PingRequest.SerializeToString, api__pb2.PingReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def Heartbeat( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/Heartbeat", api__pb2.PingRequest.SerializeToString, api__pb2.PingReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def StreamingPing( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_stream( request, target, "/api.DagsterApi/StreamingPing", api__pb2.StreamingPingRequest.SerializeToString, api__pb2.StreamingPingEvent.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def GetServerId( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/GetServerId", api__pb2.Empty.SerializeToString, api__pb2.GetServerIdReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExecutionPlanSnapshot( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExecutionPlanSnapshot", api__pb2.ExecutionPlanSnapshotRequest.SerializeToString, api__pb2.ExecutionPlanSnapshotReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ListRepositories( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ListRepositories", api__pb2.ListRepositoriesRequest.SerializeToString, api__pb2.ListRepositoriesReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalPartitionNames( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalPartitionNames", api__pb2.ExternalPartitionNamesRequest.SerializeToString, api__pb2.ExternalPartitionNamesReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalPartitionConfig( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalPartitionConfig", api__pb2.ExternalPartitionConfigRequest.SerializeToString, api__pb2.ExternalPartitionConfigReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalPartitionTags( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalPartitionTags", api__pb2.ExternalPartitionTagsRequest.SerializeToString, api__pb2.ExternalPartitionTagsReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalPartitionSetExecutionParams( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalPartitionSetExecutionParams", api__pb2.ExternalPartitionSetExecutionParamsRequest.SerializeToString, api__pb2.ExternalPartitionSetExecutionParamsReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalPipelineSubsetSnapshot( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalPipelineSubsetSnapshot", api__pb2.ExternalPipelineSubsetSnapshotRequest.SerializeToString, api__pb2.ExternalPipelineSubsetSnapshotReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalRepository( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalRepository", api__pb2.ExternalRepositoryRequest.SerializeToString, api__pb2.ExternalRepositoryReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def StreamingExternalRepository( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_stream( request, target, "/api.DagsterApi/StreamingExternalRepository", api__pb2.ExternalRepositoryRequest.SerializeToString, api__pb2.StreamingExternalRepositoryEvent.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def ExternalScheduleExecution( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ExternalScheduleExecution", api__pb2.ExternalScheduleExecutionRequest.SerializeToString, api__pb2.ExternalScheduleExecutionReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) + @staticmethod + def ExternalSensorExecution( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/api.DagsterApi/ExternalSensorExecution", + api__pb2.ExternalSensorExecutionRequest.SerializeToString, + api__pb2.ExternalSensorExecutionReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + @staticmethod def ShutdownServer( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/ShutdownServer", api__pb2.Empty.SerializeToString, api__pb2.ShutdownServerReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def CancelExecution( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/CancelExecution", api__pb2.CancelExecutionRequest.SerializeToString, api__pb2.CancelExecutionReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def CanCancelExecution( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/CanCancelExecution", api__pb2.CanCancelExecutionRequest.SerializeToString, api__pb2.CanCancelExecutionReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def StartRun( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/StartRun", api__pb2.StartRunRequest.SerializeToString, api__pb2.StartRunReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) @staticmethod def GetCurrentImage( request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None, ): return grpc.experimental.unary_unary( request, target, "/api.DagsterApi/GetCurrentImage", api__pb2.Empty.SerializeToString, api__pb2.GetCurrentImageReply.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, ) diff --git a/python_modules/dagster/dagster/grpc/client.py b/python_modules/dagster/dagster/grpc/client.py index 585568ae4..c4ca7ffca 100644 --- a/python_modules/dagster/dagster/grpc/client.py +++ b/python_modules/dagster/dagster/grpc/client.py @@ -1,384 +1,402 @@ import os import subprocess import sys import warnings from contextlib import contextmanager import grpc from dagster import check, seven from dagster.core.events import EngineEventData from dagster.core.host_representation import ExternalRepositoryOrigin from dagster.core.instance import DagsterInstance from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple from dagster.utils.error import serializable_error_info_from_exc_info from grpc_health.v1 import health_pb2 from grpc_health.v1.health_pb2_grpc import HealthStub from .__generated__ import DagsterApiStub, api_pb2 from .server import GrpcServerProcess from .types import ( CanCancelExecutionRequest, CancelExecutionRequest, ExecuteExternalPipelineArgs, ExecutionPlanSnapshotArgs, ExternalScheduleExecutionArgs, PartitionArgs, PartitionNamesArgs, PartitionSetExecutionParamArgs, PipelineSubsetSnapshotArgs, + SensorExecutionArgs, ) CLIENT_HEARTBEAT_INTERVAL = 1 def client_heartbeat_thread(client, shutdown_event): while True: shutdown_event.wait(CLIENT_HEARTBEAT_INTERVAL) if shutdown_event.is_set(): break try: client.heartbeat("ping") except grpc._channel._InactiveRpcError: # pylint: disable=protected-access continue class DagsterGrpcClient(object): def __init__(self, port=None, socket=None, host="localhost"): self.port = check.opt_int_param(port, "port") self.socket = check.opt_str_param(socket, "socket") self.host = check.opt_str_param(host, "host") check.invariant( port is not None if seven.IS_WINDOWS else True, "You must pass a valid `port` on Windows: `socket` not supported.", ) check.invariant( (port or socket) and not (port and socket), "You must pass one and only one of `port` or `socket`.", ) check.invariant( host is not None if port else True, "Must provide a hostname", ) if port: self._server_address = host + ":" + str(port) else: self._server_address = "unix:" + os.path.abspath(socket) def _query(self, method, request_type, timeout=None, **kwargs): with grpc.insecure_channel(self._server_address) as channel: stub = DagsterApiStub(channel) response = getattr(stub, method)(request_type(**kwargs), timeout=timeout) # TODO need error handling here return response def _streaming_query(self, method, request_type, **kwargs): with grpc.insecure_channel(self._server_address) as channel: stub = DagsterApiStub(channel) response_stream = getattr(stub, method)(request_type(**kwargs)) yield from response_stream def ping(self, echo): check.str_param(echo, "echo") res = self._query("Ping", api_pb2.PingRequest, echo=echo) return res.echo def heartbeat(self, echo=""): check.str_param(echo, "echo") res = self._query("Heartbeat", api_pb2.PingRequest, echo=echo) return res.echo def streaming_ping(self, sequence_length, echo): check.int_param(sequence_length, "sequence_length") check.str_param(echo, "echo") for res in self._streaming_query( "StreamingPing", api_pb2.StreamingPingRequest, sequence_length=sequence_length, echo=echo, ): yield { "sequence_number": res.sequence_number, "echo": res.echo, } def get_server_id(self): res = self._query("GetServerId", api_pb2.Empty) return res.server_id def execution_plan_snapshot(self, execution_plan_snapshot_args): check.inst_param( execution_plan_snapshot_args, "execution_plan_snapshot_args", ExecutionPlanSnapshotArgs ) res = self._query( "ExecutionPlanSnapshot", api_pb2.ExecutionPlanSnapshotRequest, serialized_execution_plan_snapshot_args=serialize_dagster_namedtuple( execution_plan_snapshot_args ), ) return deserialize_json_to_dagster_namedtuple(res.serialized_execution_plan_snapshot) def list_repositories(self): res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest) return deserialize_json_to_dagster_namedtuple( res.serialized_list_repositories_response_or_error ) def external_partition_names(self, partition_names_args): check.inst_param(partition_names_args, "partition_names_args", PartitionNamesArgs) res = self._query( "ExternalPartitionNames", api_pb2.ExternalPartitionNamesRequest, serialized_partition_names_args=serialize_dagster_namedtuple(partition_names_args), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_partition_names_or_external_partition_execution_error ) def external_partition_config(self, partition_args): check.inst_param(partition_args, "partition_args", PartitionArgs) res = self._query( "ExternalPartitionConfig", api_pb2.ExternalPartitionConfigRequest, serialized_partition_args=serialize_dagster_namedtuple(partition_args), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_partition_config_or_external_partition_execution_error ) def external_partition_tags(self, partition_args): check.inst_param(partition_args, "partition_args", PartitionArgs) res = self._query( "ExternalPartitionTags", api_pb2.ExternalPartitionTagsRequest, serialized_partition_args=serialize_dagster_namedtuple(partition_args), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_partition_tags_or_external_partition_execution_error ) def external_partition_set_execution_params(self, partition_set_execution_param_args): check.inst_param( partition_set_execution_param_args, "partition_set_execution_param_args", PartitionSetExecutionParamArgs, ) res = self._query( "ExternalPartitionSetExecutionParams", api_pb2.ExternalPartitionSetExecutionParamsRequest, serialized_partition_set_execution_param_args=serialize_dagster_namedtuple( partition_set_execution_param_args ), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_partition_set_execution_param_data_or_external_partition_execution_error ) def external_pipeline_subset(self, pipeline_subset_snapshot_args): check.inst_param( pipeline_subset_snapshot_args, "pipeline_subset_snapshot_args", PipelineSubsetSnapshotArgs, ) res = self._query( "ExternalPipelineSubsetSnapshot", api_pb2.ExternalPipelineSubsetSnapshotRequest, serialized_pipeline_subset_snapshot_args=serialize_dagster_namedtuple( pipeline_subset_snapshot_args ), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_pipeline_subset_result ) def external_repository(self, external_repository_origin): check.inst_param( external_repository_origin, "external_repository_origin", ExternalRepositoryOrigin, ) res = self._query( "ExternalRepository", api_pb2.ExternalRepositoryRequest, # rename this param name serialized_repository_python_origin=serialize_dagster_namedtuple( external_repository_origin ), ) return deserialize_json_to_dagster_namedtuple(res.serialized_external_repository_data) def streaming_external_repository(self, external_repository_origin): for res in self._streaming_query( "StreamingExternalRepository", api_pb2.ExternalRepositoryRequest, # Rename parameter serialized_repository_python_origin=serialize_dagster_namedtuple( external_repository_origin ), ): yield { "sequence_number": res.sequence_number, "serialized_external_repository_chunk": res.serialized_external_repository_chunk, } def external_schedule_execution(self, external_schedule_execution_args): check.inst_param( external_schedule_execution_args, "external_schedule_execution_args", ExternalScheduleExecutionArgs, ) res = self._query( "ExternalScheduleExecution", api_pb2.ExternalScheduleExecutionRequest, serialized_external_schedule_execution_args=serialize_dagster_namedtuple( external_schedule_execution_args ), ) return deserialize_json_to_dagster_namedtuple( res.serialized_external_schedule_execution_data_or_external_schedule_execution_error ) + def external_sensor_execution(self, sensor_execution_args): + check.inst_param( + sensor_execution_args, "sensor_execution_args", SensorExecutionArgs, + ) + + res = self._query( + "ExternalSensorExecution", + api_pb2.ExternalSensorExecutionRequest, + serialized_external_sensor_execution_args=serialize_dagster_namedtuple( + sensor_execution_args + ), + ) + + return deserialize_json_to_dagster_namedtuple( + res.serialized_external_sensor_execution_data_or_external_sensor_execution_error + ) + def shutdown_server(self, timeout=15): res = self._query("ShutdownServer", api_pb2.Empty, timeout=timeout) return deserialize_json_to_dagster_namedtuple(res.serialized_shutdown_server_result) def cancel_execution(self, cancel_execution_request): check.inst_param( cancel_execution_request, "cancel_execution_request", CancelExecutionRequest, ) res = self._query( "CancelExecution", api_pb2.CancelExecutionRequest, serialized_cancel_execution_request=serialize_dagster_namedtuple( cancel_execution_request ), ) return deserialize_json_to_dagster_namedtuple(res.serialized_cancel_execution_result) def can_cancel_execution(self, can_cancel_execution_request, timeout=None): check.inst_param( can_cancel_execution_request, "can_cancel_execution_request", CanCancelExecutionRequest, ) res = self._query( "CanCancelExecution", api_pb2.CanCancelExecutionRequest, timeout=timeout, serialized_can_cancel_execution_request=serialize_dagster_namedtuple( can_cancel_execution_request ), ) return deserialize_json_to_dagster_namedtuple(res.serialized_can_cancel_execution_result) def start_run(self, execute_run_args): check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) with DagsterInstance.from_ref(execute_run_args.instance_ref) as instance: try: res = self._query( "StartRun", api_pb2.StartRunRequest, serialized_execute_run_args=serialize_dagster_namedtuple(execute_run_args), ) return deserialize_json_to_dagster_namedtuple(res.serialized_start_run_result) except Exception: # pylint: disable=bare-except pipeline_run = instance.get_run_by_id(execute_run_args.pipeline_run_id) instance.report_engine_event( message="Unexpected error in IPC client", pipeline_run=pipeline_run, engine_event_data=EngineEventData.engine_error( serializable_error_info_from_exc_info(sys.exc_info()) ), ) raise def get_current_image(self): res = self._query("GetCurrentImage", api_pb2.Empty) return deserialize_json_to_dagster_namedtuple(res.serialized_current_image) def health_check_query(self): try: with grpc.insecure_channel(self._server_address) as channel: response = HealthStub(channel).Check( health_pb2.HealthCheckRequest(service="DagsterApi") ) except grpc.RpcError as e: print(e) # pylint: disable=print-call return health_pb2.HealthCheckResponse.UNKNOWN # pylint: disable=no-member status_number = response.status # pylint: disable=no-member return health_pb2.HealthCheckResponse.ServingStatus.Name(status_number) class EphemeralDagsterGrpcClient(DagsterGrpcClient): """A client that tells the server process that created it to shut down once it leaves a context manager.""" def __init__( self, server_process=None, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg self._server_process = check.inst_param(server_process, "server_process", subprocess.Popen) super(EphemeralDagsterGrpcClient, self).__init__(*args, **kwargs) def cleanup_server(self): if self._server_process: if self._server_process.poll() is None: try: self.shutdown_server() except grpc._channel._InactiveRpcError: # pylint: disable=protected-access pass self._server_process = None def __enter__(self): return self def __exit__(self, _exception_type, _exception_value, _traceback): self.cleanup_server() def __del__(self): if self._server_process: warnings.warn( "Managed gRPC client is being destroyed without signalling to server that " "it should shutdown. This may result in server processes living longer than " "they need to. To fix this, wrap the client in a contextmanager." ) @contextmanager def ephemeral_grpc_api_client( loadable_target_origin=None, force_port=False, max_retries=10, max_workers=1 ): check.opt_inst_param(loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin) check.bool_param(force_port, "force_port") check.int_param(max_retries, "max_retries") with GrpcServerProcess( loadable_target_origin=loadable_target_origin, force_port=force_port, max_retries=max_retries, max_workers=max_workers, lazy_load_user_code=True, ).create_ephemeral_client() as client: yield client diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py index 6adf47863..a1ce8615b 100644 --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -1,374 +1,425 @@ """Workhorse functions for individual API requests.""" import os import sys import pendulum from dagster import check from dagster.core.definitions import ScheduleExecutionContext from dagster.core.definitions.reconstructable import ( ReconstructablePipeline, ReconstructableRepository, ) +from dagster.core.definitions.sensor import SensorExecutionContext from dagster.core.errors import ( DagsterInvalidSubsetError, DagsterRunNotFoundError, DagsterSubprocessError, PartitionExecutionError, ScheduleExecutionError, + SensorExecutionError, user_code_error_boundary, ) from dagster.core.events import EngineEventData from dagster.core.execution.api import create_execution_plan, execute_run_iterator from dagster.core.host_representation import external_pipeline_data_from_def from dagster.core.host_representation.external_data import ( ExternalPartitionConfigData, ExternalPartitionExecutionErrorData, ExternalPartitionExecutionParamData, ExternalPartitionNamesData, ExternalPartitionSetExecutionParamData, ExternalPartitionTagsData, ExternalPipelineSubsetResult, ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData, + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, ) from dagster.core.instance import DagsterInstance from dagster.core.snap.execution_plan_snapshot import ( ExecutionPlanSnapshotErrorData, snapshot_from_execution_plan, ) from dagster.core.storage.pipeline_run import PipelineRun from dagster.grpc.types import ExecutionPlanSnapshotArgs from dagster.serdes import deserialize_json_to_dagster_namedtuple from dagster.serdes.ipc import IPCErrorMessage from dagster.utils import delay_interrupts, start_termination_thread from dagster.utils.error import serializable_error_info_from_exc_info from .types import ExecuteExternalPipelineArgs class RunInSubprocessComplete: """Sentinel passed over multiprocessing Queue when subprocess is complete""" class StartRunInSubprocessSuccessful: """Sentinel passed over multiprocessing Queue when launch is successful in subprocess.""" def _core_execute_run(recon_pipeline, pipeline_run, instance): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) check.inst_param(pipeline_run, "pipeline_run", PipelineRun) check.inst_param(instance, "instance", DagsterInstance) try: yield from execute_run_iterator(recon_pipeline, pipeline_run, instance) except DagsterSubprocessError as err: if not all( [err_info.cls_name == "KeyboardInterrupt" for err_info in err.subprocess_error_infos] ): yield instance.report_engine_event( "An exception was thrown during execution that is likely a framework error, " "rather than an error in user code.", pipeline_run, EngineEventData.engine_error(serializable_error_info_from_exc_info(sys.exc_info())), ) instance.report_run_failed(pipeline_run) except Exception: # pylint: disable=broad-except yield instance.report_engine_event( "An exception was thrown during execution that is likely a framework error, " "rather than an error in user code.", pipeline_run, EngineEventData.engine_error(serializable_error_info_from_exc_info(sys.exc_info())), ) instance.report_run_failed(pipeline_run) def _run_in_subprocess( serialized_execute_run_args, recon_pipeline, termination_event, subprocess_status_handler, run_event_handler, ): start_termination_thread(termination_event) try: execute_run_args = deserialize_json_to_dagster_namedtuple(serialized_execute_run_args) check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) instance = DagsterInstance.from_ref(execute_run_args.instance_ref) pipeline_run = instance.get_run_by_id(execute_run_args.pipeline_run_id) if not pipeline_run: raise DagsterRunNotFoundError( "gRPC server could not load run {run_id} in order to execute it. Make sure that the gRPC server has access to your run storage.".format( run_id=execute_run_args.pipeline_run_id ), invalid_run_id=execute_run_args.pipeline_run_id, ) pid = os.getpid() except: # pylint: disable=bare-except serializable_error_info = serializable_error_info_from_exc_info(sys.exc_info()) event = IPCErrorMessage( serializable_error_info=serializable_error_info, message="Error during RPC setup for executing run: {message}".format( message=serializable_error_info.message ), ) subprocess_status_handler(event) subprocess_status_handler(RunInSubprocessComplete()) if instance: instance.dispose() return subprocess_status_handler(StartRunInSubprocessSuccessful()) run_event_handler( instance.report_engine_event( "Started process for pipeline (pid: {pid}).".format(pid=pid), pipeline_run, EngineEventData.in_process(pid, marker_end="cli_api_subprocess_init"), ) ) # This is so nasty but seemingly unavoidable # https://amir.rachum.com/blog/2017/03/03/generator-cleanup/ closed = False try: for event in _core_execute_run(recon_pipeline, pipeline_run, instance): run_event_handler(event) except KeyboardInterrupt: run_event_handler( instance.report_engine_event( message="Pipeline execution terminated by interrupt", pipeline_run=pipeline_run, ) ) raise except GeneratorExit: closed = True raise finally: if not closed: run_event_handler( instance.report_engine_event( "Process for pipeline exited (pid: {pid}).".format(pid=pid), pipeline_run, ) ) subprocess_status_handler(RunInSubprocessComplete()) instance.dispose() def start_run_in_subprocess( serialized_execute_run_args, recon_pipeline, event_queue, termination_event ): with delay_interrupts(): _run_in_subprocess( serialized_execute_run_args, recon_pipeline, termination_event, subprocess_status_handler=event_queue.put, run_event_handler=lambda x: None, ) def get_external_pipeline_subset_result(recon_pipeline, solid_selection): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) if solid_selection: try: sub_pipeline = recon_pipeline.subset_for_execution(solid_selection) definition = sub_pipeline.get_definition() except DagsterInvalidSubsetError: return ExternalPipelineSubsetResult( success=False, error=serializable_error_info_from_exc_info(sys.exc_info()) ) else: definition = recon_pipeline.get_definition() external_pipeline_data = external_pipeline_data_from_def(definition) return ExternalPipelineSubsetResult(success=True, external_pipeline_data=external_pipeline_data) def get_external_schedule_execution( recon_repo, instance_ref, schedule_name, scheduled_execution_timestamp, scheduled_execution_timezone, ): check.inst_param( recon_repo, "recon_repo", ReconstructableRepository, ) definition = recon_repo.get_definition() schedule_def = definition.get_schedule_def(schedule_name) with DagsterInstance.from_ref(instance_ref) as instance: scheduled_execution_time = ( pendulum.from_timestamp(scheduled_execution_timestamp, tz=scheduled_execution_timezone,) if scheduled_execution_timestamp else None ) schedule_context = ScheduleExecutionContext(instance, scheduled_execution_time) try: with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of should_execute for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): if not schedule_def.should_execute(schedule_context): return ExternalScheduleExecutionData( should_execute=False, run_config=None, tags=None ) with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of run_config_fn for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): run_config = schedule_def.get_run_config(schedule_context) with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of tags_fn for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): tags = schedule_def.get_tags(schedule_context) return ExternalScheduleExecutionData( run_config=run_config, tags=tags, should_execute=True ) except ScheduleExecutionError: return ExternalScheduleExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) +def get_external_sensor_execution(recon_repo, instance_ref, sensor_name, last_evaluation_timestamp): + check.inst_param( + recon_repo, "recon_repo", ReconstructableRepository, + ) + + definition = recon_repo.get_definition() + sensor_def = definition.get_sensor_def(sensor_name) + + with DagsterInstance.from_ref(instance_ref) as instance: + sensor_context = SensorExecutionContext( + instance, last_evaluation_time=last_evaluation_timestamp + ) + + try: + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of should_execute for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + if not sensor_def.should_execute(sensor_context): + return ExternalSensorExecutionData( + should_execute=False, run_config=None, tags=None + ) + + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of run_config_fn for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + run_config = sensor_def.get_run_config(sensor_context) + + with user_code_error_boundary( + SensorExecutionError, + lambda: "Error occurred during the execution of tags_fn for sensor " + "{sensor_name}".format(sensor_name=sensor_def.name), + ): + tags = sensor_def.get_tags(sensor_context) + + return ExternalSensorExecutionData( + should_execute=True, run_config=run_config, tags=tags + ) + except SensorExecutionError: + return ExternalSensorExecutionErrorData( + serializable_error_info_from_exc_info(sys.exc_info()) + ) + + def get_partition_config(recon_repo, partition_set_name, partition_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) partition = partition_set_def.get_partition(partition_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the evaluation of the `run_config_for_partition` " "function for partition set {partition_set_name}".format( partition_set_name=partition_set_def.name ), ): run_config = partition_set_def.run_config_for_partition(partition) return ExternalPartitionConfigData(name=partition.name, run_config=run_config) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_names(recon_repo, partition_set_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the execution of the partition generation function for " "partition set {partition_set_name}".format(partition_set_name=partition_set_def.name), ): return ExternalPartitionNamesData( partition_names=partition_set_def.get_partition_names() ) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_tags(recon_repo, partition_set_name, partition_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) partition = partition_set_def.get_partition(partition_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the evaluation of the `tags_for_partition` function for " "partition set {partition_set_name}".format(partition_set_name=partition_set_def.name), ): tags = partition_set_def.tags_for_partition(partition) return ExternalPartitionTagsData(name=partition.name, tags=tags) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_external_execution_plan_snapshot(recon_pipeline, args): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) check.inst_param(args, "args", ExecutionPlanSnapshotArgs) try: pipeline = ( recon_pipeline.subset_for_execution(args.solid_selection) if args.solid_selection else recon_pipeline ) return snapshot_from_execution_plan( create_execution_plan( pipeline=pipeline, run_config=args.run_config, mode=args.mode, step_keys_to_execute=args.step_keys_to_execute, ), args.pipeline_snapshot_id, ) except: # pylint: disable=bare-except return ExecutionPlanSnapshotErrorData( error=serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_set_execution_param_data(recon_repo, partition_set_name, partition_names): repo_definition = recon_repo.get_definition() partition_set_def = repo_definition.get_partition_set_def(partition_set_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the partition generation for partition set " "{partition_set_name}".format(partition_set_name=partition_set_def.name), ): all_partitions = partition_set_def.get_partitions() partitions = [ partition for partition in all_partitions if partition.name in partition_names ] partition_data = [] for partition in partitions: def _error_message_fn(partition_set_name, partition_name): return lambda: ( "Error occurred during the partition config and tag generation for " "partition set {partition_set_name}::{partition_name}".format( partition_set_name=partition_set_name, partition_name=partition_name ) ) with user_code_error_boundary( PartitionExecutionError, _error_message_fn(partition_set_def.name, partition.name) ): run_config = partition_set_def.run_config_for_partition(partition) tags = partition_set_def.tags_for_partition(partition) partition_data.append( ExternalPartitionExecutionParamData( name=partition.name, tags=tags, run_config=run_config, ) ) return ExternalPartitionSetExecutionParamData(partition_data=partition_data) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) diff --git a/python_modules/dagster/dagster/grpc/protos/api.proto b/python_modules/dagster/dagster/grpc/protos/api.proto index 20fa72b96..a0a34548f 100644 --- a/python_modules/dagster/dagster/grpc/protos/api.proto +++ b/python_modules/dagster/dagster/grpc/protos/api.proto @@ -1,159 +1,168 @@ // If you make changes to this file, run "python -m dagster.grpc.compile" after. syntax = "proto3"; package api; service DagsterApi { rpc Ping (PingRequest) returns (PingReply) {} rpc Heartbeat (PingRequest) returns (PingReply) {} rpc StreamingPing (StreamingPingRequest) returns (stream StreamingPingEvent) {} rpc GetServerId (Empty) returns (GetServerIdReply) {} rpc ExecutionPlanSnapshot (ExecutionPlanSnapshotRequest) returns (ExecutionPlanSnapshotReply) {} rpc ListRepositories (ListRepositoriesRequest) returns (ListRepositoriesReply) {} rpc ExternalPartitionNames (ExternalPartitionNamesRequest) returns (ExternalPartitionNamesReply) {} rpc ExternalPartitionConfig (ExternalPartitionConfigRequest) returns (ExternalPartitionConfigReply) {} rpc ExternalPartitionTags (ExternalPartitionTagsRequest) returns (ExternalPartitionTagsReply) {} rpc ExternalPartitionSetExecutionParams (ExternalPartitionSetExecutionParamsRequest) returns (ExternalPartitionSetExecutionParamsReply) {} rpc ExternalPipelineSubsetSnapshot (ExternalPipelineSubsetSnapshotRequest) returns (ExternalPipelineSubsetSnapshotReply) {} rpc ExternalRepository (ExternalRepositoryRequest) returns (ExternalRepositoryReply) {} rpc StreamingExternalRepository (ExternalRepositoryRequest) returns (stream StreamingExternalRepositoryEvent) {} rpc ExternalScheduleExecution (ExternalScheduleExecutionRequest) returns (ExternalScheduleExecutionReply) {} + rpc ExternalSensorExecution (ExternalSensorExecutionRequest) returns (ExternalSensorExecutionReply) {} rpc ShutdownServer (Empty) returns (ShutdownServerReply) {} rpc CancelExecution (CancelExecutionRequest) returns (CancelExecutionReply) {} rpc CanCancelExecution (CanCancelExecutionRequest) returns (CanCancelExecutionReply) {} rpc StartRun (StartRunRequest) returns (StartRunReply) {} rpc GetCurrentImage (Empty) returns (GetCurrentImageReply) {} } message Empty {} message PingRequest { string echo = 1; } message PingReply { string echo = 1; } message StreamingPingRequest { int32 sequence_length = 1; string echo = 2; } message StreamingPingEvent { int32 sequence_number = 1; string echo = 2; } message GetServerIdReply { string server_id = 1; } message ExecutionPlanSnapshotRequest { string serialized_execution_plan_snapshot_args = 1; } message ExecutionPlanSnapshotReply { string serialized_execution_plan_snapshot = 1; } message ExternalPartitionNamesRequest { string serialized_partition_names_args = 1; } message ExternalPartitionNamesReply { string serialized_external_partition_names_or_external_partition_execution_error = 1; } message ExternalPartitionConfigRequest { string serialized_partition_args = 1; } message ExternalPartitionConfigReply { string serialized_external_partition_config_or_external_partition_execution_error = 1; } message ExternalPartitionTagsRequest { string serialized_partition_args = 1; } message ExternalPartitionTagsReply { string serialized_external_partition_tags_or_external_partition_execution_error = 1; } message ExternalPartitionSetExecutionParamsRequest { string serialized_partition_set_execution_param_args = 1; } message ExternalPartitionSetExecutionParamsReply { string serialized_external_partition_set_execution_param_data_or_external_partition_execution_error = 1; } message ListRepositoriesRequest { } message ListRepositoriesReply { string serialized_list_repositories_response_or_error = 1; } message ExternalPipelineSubsetSnapshotRequest { string serialized_pipeline_subset_snapshot_args = 1; } message ExternalPipelineSubsetSnapshotReply { string serialized_external_pipeline_subset_result = 1; } message ExternalRepositoryRequest { string serialized_repository_python_origin = 1; } message ExternalRepositoryReply { string serialized_external_repository_data = 1; } message StreamingExternalRepositoryEvent { int32 sequence_number = 1; string serialized_external_repository_chunk = 2; } message ExternalScheduleExecutionRequest { string serialized_external_schedule_execution_args = 1; } message ExternalScheduleExecutionReply { string serialized_external_schedule_execution_data_or_external_schedule_execution_error = 1; } +message ExternalSensorExecutionRequest { + string serialized_external_sensor_execution_args = 1; +} + +message ExternalSensorExecutionReply { + string serialized_external_sensor_execution_data_or_external_sensor_execution_error = 1; +} + message ShutdownServerReply { string serialized_shutdown_server_result = 1; } message CancelExecutionRequest { string serialized_cancel_execution_request = 1; } message CancelExecutionReply { string serialized_cancel_execution_result = 1; } message CanCancelExecutionRequest { string serialized_can_cancel_execution_request = 1; } message CanCancelExecutionReply { string serialized_can_cancel_execution_result = 1; } message StartRunRequest { string serialized_execute_run_args = 1; } message StartRunReply { string serialized_start_run_result = 1; } message GetCurrentImageReply { string serialized_current_image = 1; } diff --git a/python_modules/dagster/dagster/grpc/server.py b/python_modules/dagster/dagster/grpc/server.py index 372530dbd..b3bd58641 100644 --- a/python_modules/dagster/dagster/grpc/server.py +++ b/python_modules/dagster/dagster/grpc/server.py @@ -1,1020 +1,1039 @@ import math import os import queue import sys import threading import time import uuid from collections import namedtuple from concurrent.futures import ThreadPoolExecutor import grpc from dagster import check, seven from dagster.core.code_pointer import CodePointer from dagster.core.definitions.reconstructable import ( ReconstructableRepository, repository_def_from_target_def, ) from dagster.core.host_representation import ExternalPipelineOrigin, ExternalRepositoryOrigin from dagster.core.host_representation.external_data import external_repository_data_from_def from dagster.core.instance import DagsterInstance from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.serdes import ( deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple, whitelist_for_serdes, ) from dagster.serdes.ipc import ( IPCErrorMessage, ipc_write_stream, open_ipc_subprocess, read_unary_response, ) from dagster.seven import multiprocessing from dagster.utils import find_free_port, safe_tempfile_path_unmanaged from dagster.utils.error import serializable_error_info_from_exc_info from grpc_health.v1 import health, health_pb2, health_pb2_grpc from .__generated__ import api_pb2 from .__generated__.api_pb2_grpc import DagsterApiServicer, add_DagsterApiServicer_to_server from .impl import ( RunInSubprocessComplete, StartRunInSubprocessSuccessful, get_external_execution_plan_snapshot, get_external_pipeline_subset_result, get_external_schedule_execution, + get_external_sensor_execution, get_partition_config, get_partition_names, get_partition_set_execution_param_data, get_partition_tags, start_run_in_subprocess, ) from .types import ( CanCancelExecutionRequest, CanCancelExecutionResult, CancelExecutionRequest, CancelExecutionResult, ExecuteExternalPipelineArgs, ExecutionPlanSnapshotArgs, ExternalScheduleExecutionArgs, GetCurrentImageResult, ListRepositoriesResponse, LoadableRepositorySymbol, PartitionArgs, PartitionNamesArgs, PartitionSetExecutionParamArgs, PipelineSubsetSnapshotArgs, + SensorExecutionArgs, ShutdownServerResult, StartRunResult, ) from .utils import get_loadable_targets EVENT_QUEUE_POLL_INTERVAL = 0.1 CLEANUP_TICK = 0.5 STREAMING_EXTERNAL_REPOSITORY_CHUNK_SIZE = 4000000 class CouldNotBindGrpcServerToAddress(Exception): pass class LazyRepositorySymbolsAndCodePointers: """Enables lazily loading user code at RPC-time so that it doesn't interrupt startup and we can gracefully handle user code errors.""" def __init__(self, loadable_target_origin): self._loadable_target_origin = loadable_target_origin self._loadable_repository_symbols = None self._code_pointers_by_repo_name = None def load(self): self._loadable_repository_symbols = load_loadable_repository_symbols( self._loadable_target_origin ) self._code_pointers_by_repo_name = build_code_pointers_by_repo_name( self._loadable_target_origin, self._loadable_repository_symbols ) @property def loadable_repository_symbols(self): if self._loadable_repository_symbols is None: self.load() return self._loadable_repository_symbols @property def code_pointers_by_repo_name(self): if self._code_pointers_by_repo_name is None: self.load() return self._code_pointers_by_repo_name def load_loadable_repository_symbols(loadable_target_origin): if loadable_target_origin: loadable_targets = get_loadable_targets( loadable_target_origin.python_file, loadable_target_origin.module_name, loadable_target_origin.package_name, loadable_target_origin.working_directory, loadable_target_origin.attribute, ) return [ LoadableRepositorySymbol( attribute=loadable_target.attribute, repository_name=repository_def_from_target_def( loadable_target.target_definition ).name, ) for loadable_target in loadable_targets ] else: return [] def build_code_pointers_by_repo_name(loadable_target_origin, loadable_repository_symbols): repository_code_pointer_dict = {} for loadable_repository_symbol in loadable_repository_symbols: if loadable_target_origin.python_file: repository_code_pointer_dict[ loadable_repository_symbol.repository_name ] = CodePointer.from_python_file( loadable_target_origin.python_file, loadable_repository_symbol.attribute, loadable_target_origin.working_directory, ) elif loadable_target_origin.package_name: repository_code_pointer_dict[ loadable_repository_symbol.repository_name ] = CodePointer.from_python_package( loadable_target_origin.package_name, loadable_repository_symbol.attribute, ) else: repository_code_pointer_dict[ loadable_repository_symbol.repository_name ] = CodePointer.from_module( loadable_target_origin.module_name, loadable_repository_symbol.attribute, ) return repository_code_pointer_dict class DagsterApiServer(DagsterApiServicer): # The loadable_target_origin is currently Noneable to support instaniating a server. # This helps us test the ping methods, and incrementally migrate each method to # the target passed in here instead of passing in a target in the argument. def __init__( self, server_termination_event, loadable_target_origin=None, heartbeat=False, heartbeat_timeout=30, lazy_load_user_code=False, fixed_server_id=None, ): super(DagsterApiServer, self).__init__() check.bool_param(heartbeat, "heartbeat") check.int_param(heartbeat_timeout, "heartbeat_timeout") check.invariant(heartbeat_timeout > 0, "heartbeat_timeout must be greater than 0") self._server_termination_event = check.inst_param( server_termination_event, "server_termination_event", seven.ThreadingEventType ) self._loadable_target_origin = check.opt_inst_param( loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin ) # Each server is initialized with a unique UUID. This UUID is used by clients to track when # servers are replaced and is used for cache invalidation and reloading. self._server_id = check.opt_str_param(fixed_server_id, "fixed_server_id", str(uuid.uuid4())) # Client tells the server to shutdown by calling ShutdownServer (or by failing to send a # hearbeat, at which point this event is set. The cleanup thread will then set the server # termination event once all current executions have finished, which will stop the server) self._shutdown_once_executions_finish_event = threading.Event() # Dict[str, (multiprocessing.Process, DagsterInstance)] self._executions = {} # Dict[str, multiprocessing.Event] self._termination_events = {} self._termination_times = {} self._execution_lock = threading.Lock() self._repository_symbols_and_code_pointers = LazyRepositorySymbolsAndCodePointers( loadable_target_origin ) if not lazy_load_user_code: self._repository_symbols_and_code_pointers.load() self.__last_heartbeat_time = time.time() if heartbeat: self.__heartbeat_thread = threading.Thread( target=self._heartbeat_thread, args=(heartbeat_timeout,), ) self.__heartbeat_thread.daemon = True self.__heartbeat_thread.start() else: self.__heartbeat_thread = None self.__cleanup_thread = threading.Thread(target=self._cleanup_thread, args=(),) self.__cleanup_thread.daemon = True self.__cleanup_thread.start() def cleanup(self): if self.__heartbeat_thread: self.__heartbeat_thread.join() self.__cleanup_thread.join() def _heartbeat_thread(self, heartbeat_timeout): while True: self._shutdown_once_executions_finish_event.wait(heartbeat_timeout) if self._shutdown_once_executions_finish_event.is_set(): break if self.__last_heartbeat_time < time.time() - heartbeat_timeout: self._shutdown_once_executions_finish_event.set() def _cleanup_thread(self): while True: self._server_termination_event.wait(CLEANUP_TICK) if self._server_termination_event.is_set(): break self._check_for_orphaned_runs() def _check_for_orphaned_runs(self): with self._execution_lock: runs_to_clear = [] for run_id, (process, instance_ref) in self._executions.items(): if not process.is_alive(): with DagsterInstance.from_ref(instance_ref) as instance: runs_to_clear.append(run_id) run = instance.get_run_by_id(run_id) if not run or run.is_finished: continue # the process died in an unexpected manner. inform the system message = "Pipeline execution process for {run_id} unexpectedly exited.".format( run_id=run.run_id ) instance.report_engine_event(message, run, cls=self.__class__) instance.report_run_failed(run) for run_id in runs_to_clear: self._clear_run(run_id) # Once there are no more running executions after we have received a request to # shut down, terminate the server if self._shutdown_once_executions_finish_event.is_set(): if len(self._executions) == 0: self._server_termination_event.set() # Assumes execution lock is being held def _clear_run(self, run_id): del self._executions[run_id] del self._termination_events[run_id] if run_id in self._termination_times: del self._termination_times[run_id] def _recon_repository_from_origin(self, external_repository_origin): check.inst_param( external_repository_origin, "external_repository_origin", ExternalRepositoryOrigin, ) return ReconstructableRepository( self._repository_symbols_and_code_pointers.code_pointers_by_repo_name[ external_repository_origin.repository_name ], self._get_current_image(), ) def _recon_pipeline_from_origin(self, external_pipeline_origin): check.inst_param( external_pipeline_origin, "external_pipeline_origin", ExternalPipelineOrigin ) recon_repo = self._recon_repository_from_origin( external_pipeline_origin.external_repository_origin ) return recon_repo.get_reconstructable_pipeline(external_pipeline_origin.pipeline_name) def Ping(self, request, _context): echo = request.echo return api_pb2.PingReply(echo=echo) def StreamingPing(self, request, _context): sequence_length = request.sequence_length echo = request.echo for sequence_number in range(sequence_length): yield api_pb2.StreamingPingEvent(sequence_number=sequence_number, echo=echo) def Heartbeat(self, request, _context): self.__last_heartbeat_time = time.time() echo = request.echo return api_pb2.PingReply(echo=echo) def GetServerId(self, _request, _context): return api_pb2.GetServerIdReply(server_id=self._server_id) def ExecutionPlanSnapshot(self, request, _context): execution_plan_args = deserialize_json_to_dagster_namedtuple( request.serialized_execution_plan_snapshot_args ) check.inst_param(execution_plan_args, "execution_plan_args", ExecutionPlanSnapshotArgs) recon_pipeline = self._recon_pipeline_from_origin(execution_plan_args.pipeline_origin) execution_plan_snapshot_or_error = get_external_execution_plan_snapshot( recon_pipeline, execution_plan_args ) return api_pb2.ExecutionPlanSnapshotReply( serialized_execution_plan_snapshot=serialize_dagster_namedtuple( execution_plan_snapshot_or_error ) ) def ListRepositories(self, request, _context): try: response = ListRepositoriesResponse( self._repository_symbols_and_code_pointers.loadable_repository_symbols, executable_path=self._loadable_target_origin.executable_path if self._loadable_target_origin else None, repository_code_pointer_dict=( self._repository_symbols_and_code_pointers.code_pointers_by_repo_name ), ) except Exception: # pylint: disable=broad-except response = serializable_error_info_from_exc_info(sys.exc_info()) return api_pb2.ListRepositoriesReply( serialized_list_repositories_response_or_error=serialize_dagster_namedtuple(response) ) def ExternalPartitionNames(self, request, _context): partition_names_args = deserialize_json_to_dagster_namedtuple( request.serialized_partition_names_args ) check.inst_param(partition_names_args, "partition_names_args", PartitionNamesArgs) recon_repo = self._recon_repository_from_origin(partition_names_args.repository_origin) return api_pb2.ExternalPartitionNamesReply( serialized_external_partition_names_or_external_partition_execution_error=serialize_dagster_namedtuple( get_partition_names(recon_repo, partition_names_args.partition_set_name,) ) ) def ExternalPartitionSetExecutionParams(self, request, _context): args = deserialize_json_to_dagster_namedtuple( request.serialized_partition_set_execution_param_args ) check.inst_param( args, "args", PartitionSetExecutionParamArgs, ) recon_repo = self._recon_repository_from_origin(args.repository_origin) return api_pb2.ExternalPartitionSetExecutionParamsReply( serialized_external_partition_set_execution_param_data_or_external_partition_execution_error=serialize_dagster_namedtuple( get_partition_set_execution_param_data( recon_repo=recon_repo, partition_set_name=args.partition_set_name, partition_names=args.partition_names, ) ) ) def ExternalPartitionConfig(self, request, _context): args = deserialize_json_to_dagster_namedtuple(request.serialized_partition_args) check.inst_param(args, "args", PartitionArgs) recon_repo = self._recon_repository_from_origin(args.repository_origin) return api_pb2.ExternalPartitionConfigReply( serialized_external_partition_config_or_external_partition_execution_error=serialize_dagster_namedtuple( get_partition_config(recon_repo, args.partition_set_name, args.partition_name) ) ) def ExternalPartitionTags(self, request, _context): partition_args = deserialize_json_to_dagster_namedtuple(request.serialized_partition_args) check.inst_param(partition_args, "partition_args", PartitionArgs) recon_repo = self._recon_repository_from_origin(partition_args.repository_origin) return api_pb2.ExternalPartitionTagsReply( serialized_external_partition_tags_or_external_partition_execution_error=serialize_dagster_namedtuple( get_partition_tags( recon_repo, partition_args.partition_set_name, partition_args.partition_name ) ) ) def ExternalPipelineSubsetSnapshot(self, request, _context): pipeline_subset_snapshot_args = deserialize_json_to_dagster_namedtuple( request.serialized_pipeline_subset_snapshot_args ) check.inst_param( pipeline_subset_snapshot_args, "pipeline_subset_snapshot_args", PipelineSubsetSnapshotArgs, ) return api_pb2.ExternalPipelineSubsetSnapshotReply( serialized_external_pipeline_subset_result=serialize_dagster_namedtuple( get_external_pipeline_subset_result( self._recon_pipeline_from_origin(pipeline_subset_snapshot_args.pipeline_origin), pipeline_subset_snapshot_args.solid_selection, ) ) ) def _get_serialized_external_repository_data(self, request): repository_origin = deserialize_json_to_dagster_namedtuple( request.serialized_repository_python_origin ) check.inst_param(repository_origin, "repository_origin", ExternalRepositoryOrigin) recon_repo = self._recon_repository_from_origin(repository_origin) return serialize_dagster_namedtuple( external_repository_data_from_def(recon_repo.get_definition()) ) def ExternalRepository(self, request, _context): serialized_external_repository_data = self._get_serialized_external_repository_data(request) return api_pb2.ExternalRepositoryReply( serialized_external_repository_data=serialized_external_repository_data, ) def StreamingExternalRepository(self, request, _context): serialized_external_repository_data = self._get_serialized_external_repository_data(request) num_chunks = int( math.ceil( float(len(serialized_external_repository_data)) / STREAMING_EXTERNAL_REPOSITORY_CHUNK_SIZE ) ) for i in range(num_chunks): start_index = i * STREAMING_EXTERNAL_REPOSITORY_CHUNK_SIZE end_index = min( (i + 1) * STREAMING_EXTERNAL_REPOSITORY_CHUNK_SIZE, len(serialized_external_repository_data), ) yield api_pb2.StreamingExternalRepositoryEvent( sequence_number=i, serialized_external_repository_chunk=serialized_external_repository_data[ start_index:end_index ], ) def ExternalScheduleExecution(self, request, _context): args = deserialize_json_to_dagster_namedtuple( request.serialized_external_schedule_execution_args ) check.inst_param( args, "args", ExternalScheduleExecutionArgs, ) recon_repo = self._recon_repository_from_origin(args.repository_origin) return api_pb2.ExternalScheduleExecutionReply( serialized_external_schedule_execution_data_or_external_schedule_execution_error=serialize_dagster_namedtuple( get_external_schedule_execution( recon_repo, args.instance_ref, args.schedule_name, args.scheduled_execution_timestamp, args.scheduled_execution_timezone, ) ) ) + def ExternalSensorExecution(self, request, _context): + args = deserialize_json_to_dagster_namedtuple( + request.serialized_external_sensor_execution_args + ) + + check.inst_param(args, "args", SensorExecutionArgs) + + recon_repo = self._recon_repository_from_origin(args.repository_origin) + + return api_pb2.ExternalSensorExecutionReply( + serialized_external_sensor_execution_data_or_external_sensor_execution_error=serialize_dagster_namedtuple( + get_external_sensor_execution( + recon_repo, args.instance_ref, args.sensor_name, args.last_evaluation_time + ) + ) + ) + def ShutdownServer(self, request, _context): try: self._shutdown_once_executions_finish_event.set() return api_pb2.ShutdownServerReply( serialized_shutdown_server_result=serialize_dagster_namedtuple( ShutdownServerResult(success=True, serializable_error_info=None) ) ) except: # pylint: disable=bare-except return api_pb2.ShutdownServerReply( serialized_shutdown_server_result=serialize_dagster_namedtuple( ShutdownServerResult( success=False, serializable_error_info=serializable_error_info_from_exc_info( sys.exc_info() ), ) ) ) def CancelExecution(self, request, _context): success = False message = None serializable_error_info = None try: cancel_execution_request = check.inst( deserialize_json_to_dagster_namedtuple(request.serialized_cancel_execution_request), CancelExecutionRequest, ) with self._execution_lock: if cancel_execution_request.run_id in self._executions: self._termination_events[cancel_execution_request.run_id].set() self._termination_times[cancel_execution_request.run_id] = time.time() success = True except: # pylint: disable=bare-except serializable_error_info = serializable_error_info_from_exc_info(sys.exc_info()) return api_pb2.CancelExecutionReply( serialized_cancel_execution_result=serialize_dagster_namedtuple( CancelExecutionResult( success=success, message=message, serializable_error_info=serializable_error_info, ) ) ) def CanCancelExecution(self, request, _context): can_cancel_execution_request = check.inst( deserialize_json_to_dagster_namedtuple(request.serialized_can_cancel_execution_request), CanCancelExecutionRequest, ) with self._execution_lock: run_id = can_cancel_execution_request.run_id can_cancel = ( run_id in self._executions and not self._termination_events[run_id].is_set() ) return api_pb2.CanCancelExecutionReply( serialized_can_cancel_execution_result=serialize_dagster_namedtuple( CanCancelExecutionResult(can_cancel=can_cancel) ) ) def StartRun(self, request, _context): if self._shutdown_once_executions_finish_event.is_set(): return api_pb2.StartRunReply( serialized_start_run_result=serialize_dagster_namedtuple( StartRunResult( success=False, message="Tried to start a run on a server after telling it to shut down", serializable_error_info=None, ) ) ) try: execute_run_args = check.inst( deserialize_json_to_dagster_namedtuple(request.serialized_execute_run_args), ExecuteExternalPipelineArgs, ) run_id = execute_run_args.pipeline_run_id recon_pipeline = self._recon_pipeline_from_origin(execute_run_args.pipeline_origin) except: # pylint: disable=bare-except return api_pb2.StartRunReply( serialized_start_run_result=serialize_dagster_namedtuple( StartRunResult( success=False, message=None, serializable_error_info=serializable_error_info_from_exc_info( sys.exc_info() ), ) ) ) event_queue = multiprocessing.Queue() termination_event = multiprocessing.Event() execution_process = multiprocessing.Process( target=start_run_in_subprocess, args=[ request.serialized_execute_run_args, recon_pipeline, event_queue, termination_event, ], ) with self._execution_lock: execution_process.start() self._executions[run_id] = ( execution_process, execute_run_args.instance_ref, ) self._termination_events[run_id] = termination_event success = None message = None serializable_error_info = None while success is None: time.sleep(EVENT_QUEUE_POLL_INTERVAL) # We use `get_nowait()` instead of `get()` so that we can handle the case where the # execution process has died unexpectedly -- `get()` would hang forever in that case try: dagster_event_or_ipc_error_message_or_done = event_queue.get_nowait() except queue.Empty: if not execution_process.is_alive(): # subprocess died unexpectedly success = False message = ( "GRPC server: Subprocess for {run_id} terminated unexpectedly with " "exit code {exit_code}".format( run_id=run_id, exit_code=execution_process.exitcode, ) ) serializable_error_info = serializable_error_info_from_exc_info(sys.exc_info()) else: if isinstance( dagster_event_or_ipc_error_message_or_done, StartRunInSubprocessSuccessful ): success = True elif isinstance( dagster_event_or_ipc_error_message_or_done, RunInSubprocessComplete ): continue if isinstance(dagster_event_or_ipc_error_message_or_done, IPCErrorMessage): success = False message = dagster_event_or_ipc_error_message_or_done.message serializable_error_info = ( dagster_event_or_ipc_error_message_or_done.serializable_error_info ) # Ensure that if the run failed, we remove it from the executions map before # returning so that CanCancel will never return True if not success: with self._execution_lock: self._clear_run(run_id) return api_pb2.StartRunReply( serialized_start_run_result=serialize_dagster_namedtuple( StartRunResult( success=success, message=message, serializable_error_info=serializable_error_info, ) ) ) def _get_current_image(self): return os.getenv("DAGSTER_CURRENT_IMAGE") def GetCurrentImage(self, request, _context): return api_pb2.GetCurrentImageReply( serialized_current_image=serialize_dagster_namedtuple( GetCurrentImageResult( current_image=self._get_current_image(), serializable_error_info=None ) ) ) @whitelist_for_serdes class GrpcServerStartedEvent(namedtuple("GrpcServerStartedEvent", "")): pass @whitelist_for_serdes class GrpcServerFailedToBindEvent(namedtuple("GrpcServerStartedEvent", "")): pass def server_termination_target(termination_event, server): termination_event.wait() # We could make this grace period configurable if we set it in the ShutdownServer handler server.stop(grace=5) class DagsterGrpcServer(object): def __init__( self, host="localhost", port=None, socket=None, max_workers=1, loadable_target_origin=None, heartbeat=False, heartbeat_timeout=30, lazy_load_user_code=False, ipc_output_file=None, fixed_server_id=None, ): check.opt_str_param(host, "host") check.opt_int_param(port, "port") check.opt_str_param(socket, "socket") check.int_param(max_workers, "max_workers") check.opt_inst_param(loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin) check.invariant( port is not None if seven.IS_WINDOWS else True, "You must pass a valid `port` on Windows: `socket` not supported.", ) check.invariant( (port or socket) and not (port and socket), "You must pass one and only one of `port` or `socket`.", ) check.invariant( host is not None if port else True, "Must provide a host when serving on a port", ) check.bool_param(heartbeat, "heartbeat") check.int_param(heartbeat_timeout, "heartbeat_timeout") self._ipc_output_file = check.opt_str_param(ipc_output_file, "ipc_output_file") check.opt_str_param(fixed_server_id, "fixed_server_id") check.invariant(heartbeat_timeout > 0, "heartbeat_timeout must be greater than 0") check.invariant( max_workers > 1 if heartbeat else True, "max_workers must be greater than 1 if heartbeat is True", ) self.server = grpc.server(ThreadPoolExecutor(max_workers=max_workers)) self._server_termination_event = threading.Event() self._api_servicer = DagsterApiServer( server_termination_event=self._server_termination_event, loadable_target_origin=loadable_target_origin, heartbeat=heartbeat, heartbeat_timeout=heartbeat_timeout, lazy_load_user_code=lazy_load_user_code, fixed_server_id=fixed_server_id, ) # Create a health check servicer self._health_servicer = health.HealthServicer() health_pb2_grpc.add_HealthServicer_to_server(self._health_servicer, self.server) add_DagsterApiServicer_to_server(self._api_servicer, self.server) if port: server_address = host + ":" + str(port) else: server_address = "unix:" + os.path.abspath(socket) # grpc.Server.add_insecure_port returns: # - 0 on failure # - port number when a port is successfully bound # - 1 when a UDS is successfully bound res = self.server.add_insecure_port(server_address) if socket and res != 1: if self._ipc_output_file: with ipc_write_stream(self._ipc_output_file) as ipc_stream: ipc_stream.send(GrpcServerFailedToBindEvent()) raise CouldNotBindGrpcServerToAddress(socket) if port and res != port: if self._ipc_output_file: with ipc_write_stream(self._ipc_output_file) as ipc_stream: ipc_stream.send(GrpcServerFailedToBindEvent()) raise CouldNotBindGrpcServerToAddress(port) def serve(self): # Unfortunately it looks like ports bind late (here) and so this can fail with an error # from C++ like: # # E0625 08:46:56.180112000 4697443776 server_chttp2.cc:40] # {"created":"@1593089216.180085000","description":"Only 1 addresses added out of total # 2 resolved","file":"src/core/ext/transport/chttp2/server/chttp2_server.cc", # "file_line":406,"referenced_errors":[{"created":"@1593089216.180083000","description": # "Unable to configure socket","fd":6,"file": # "src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":217, # "referenced_errors":[{"created":"@1593089216.180079000", # "description":"Address already in use","errno":48,"file": # "src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":190,"os_error": # "Address already in use","syscall":"bind"}]}]} # # This is printed to stdout and there is no return value from server.start or exception # raised in Python that we can use to handle this. The standard recipes for hijacking C # stdout (so we could inspect this output and respond accordingly), e.g. # https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/, don't seem # to work (at least on Mac OS X) against grpc, and in any case would involve a huge # cross-version and cross-platform maintenance burden. We have an issue open against grpc, # https://github.com/grpc/grpc/issues/23315, and our own tracking issue at self.server.start() # Note: currently this is hardcoded as serving, since both services are cohosted # pylint: disable=no-member self._health_servicer.set("DagsterApi", health_pb2.HealthCheckResponse.SERVING) if self._ipc_output_file: with ipc_write_stream(self._ipc_output_file) as ipc_stream: ipc_stream.send(GrpcServerStartedEvent()) server_termination_thread = threading.Thread( target=server_termination_target, args=[self._server_termination_event, self.server], name="grpc-server-termination", ) server_termination_thread.daemon = True server_termination_thread.start() self.server.wait_for_termination() server_termination_thread.join() self._api_servicer.cleanup() class CouldNotStartServerProcess(Exception): def __init__(self, port=None, socket=None): super(CouldNotStartServerProcess, self).__init__( "Could not start server with " + ( "port {port}".format(port=port) if port is not None else "socket {socket}".format(socket=socket) ) ) def wait_for_grpc_server(server_process, ipc_output_file, timeout=15): event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process) if isinstance(event, GrpcServerFailedToBindEvent): raise CouldNotBindGrpcServerToAddress() elif isinstance(event, GrpcServerStartedEvent): return True else: raise Exception( "Received unexpected IPC event from gRPC Server: {event}".format(event=event) ) def open_server_process( port, socket, loadable_target_origin=None, max_workers=1, heartbeat=False, heartbeat_timeout=30, lazy_load_user_code=False, fixed_server_id=None, ): check.invariant((port or socket) and not (port and socket), "Set only port or socket") check.opt_inst_param(loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin) check.int_param(max_workers, "max_workers") with seven.TemporaryDirectory() as temp_dir: output_file = os.path.join( temp_dir, "grpc-server-startup-{uuid}".format(uuid=uuid.uuid4().hex) ) subprocess_args = ( [ loadable_target_origin.executable_path if loadable_target_origin and loadable_target_origin.executable_path else sys.executable, "-m", "dagster.grpc", ] + (["--port", str(port)] if port else []) + (["--socket", socket] if socket else []) + ["-n", str(max_workers)] + (["--heartbeat"] if heartbeat else []) + (["--heartbeat-timeout", str(heartbeat_timeout)] if heartbeat_timeout else []) + (["--lazy-load-user-code"] if lazy_load_user_code else []) + (["--ipc-output-file", output_file]) + (["--fixed-server-id", fixed_server_id] if fixed_server_id else []) ) if loadable_target_origin: subprocess_args += loadable_target_origin.get_cli_args() server_process = open_ipc_subprocess(subprocess_args) try: wait_for_grpc_server(server_process, output_file) except: if server_process.poll() is None: server_process.terminate() raise return server_process def open_server_process_on_dynamic_port( max_retries=10, loadable_target_origin=None, max_workers=1, heartbeat=False, heartbeat_timeout=30, lazy_load_user_code=False, fixed_server_id=None, ): server_process = None retries = 0 while server_process is None and retries < max_retries: port = find_free_port() try: server_process = open_server_process( port=port, socket=None, loadable_target_origin=loadable_target_origin, max_workers=max_workers, heartbeat=heartbeat, heartbeat_timeout=heartbeat_timeout, lazy_load_user_code=lazy_load_user_code, fixed_server_id=fixed_server_id, ) except CouldNotBindGrpcServerToAddress: pass retries += 1 return server_process, port def cleanup_server_process(server_process, timeout=3): start_time = time.time() while server_process.poll() is None and (time.time() - start_time) < timeout: time.sleep(0.05) if server_process.poll() is None: server_process.terminate() server_process.wait() class GrpcServerProcess(object): def __init__( self, loadable_target_origin=None, force_port=False, max_retries=10, max_workers=1, heartbeat=False, heartbeat_timeout=30, lazy_load_user_code=False, fixed_server_id=None, ): self.port = None self.socket = None self.server_process = None check.opt_inst_param(loadable_target_origin, "loadable_target_origin", LoadableTargetOrigin) check.bool_param(force_port, "force_port") check.int_param(max_retries, "max_retries") check.int_param(max_workers, "max_workers") check.bool_param(heartbeat, "heartbeat") check.int_param(heartbeat_timeout, "heartbeat_timeout") check.invariant(heartbeat_timeout > 0, "heartbeat_timeout must be greater than 0") check.bool_param(lazy_load_user_code, "lazy_load_user_code") check.opt_str_param(fixed_server_id, "fixed_server_id") check.invariant( max_workers > 1 if heartbeat else True, "max_workers must be greater than 1 if heartbeat is True", ) if seven.IS_WINDOWS or force_port: self.server_process, self.port = open_server_process_on_dynamic_port( max_retries=max_retries, loadable_target_origin=loadable_target_origin, max_workers=max_workers, heartbeat=heartbeat, heartbeat_timeout=heartbeat_timeout, lazy_load_user_code=lazy_load_user_code, fixed_server_id=fixed_server_id, ) else: self.socket = safe_tempfile_path_unmanaged() self.server_process = open_server_process( port=None, socket=self.socket, loadable_target_origin=loadable_target_origin, max_workers=max_workers, heartbeat=heartbeat, heartbeat_timeout=heartbeat_timeout, lazy_load_user_code=lazy_load_user_code, fixed_server_id=fixed_server_id, ) if self.server_process is None: raise CouldNotStartServerProcess(port=self.port, socket=self.socket) def wait(self, timeout=30): if self.server_process.poll() is None: seven.wait_for_process(self.server_process, timeout=timeout) def create_ephemeral_client(self): from dagster.grpc.client import EphemeralDagsterGrpcClient return EphemeralDagsterGrpcClient( port=self.port, socket=self.socket, server_process=self.server_process ) diff --git a/python_modules/dagster/dagster/grpc/types.py b/python_modules/dagster/dagster/grpc/types.py index d0f9f7bbe..70699f407 100644 --- a/python_modules/dagster/dagster/grpc/types.py +++ b/python_modules/dagster/dagster/grpc/types.py @@ -1,343 +1,363 @@ from collections import namedtuple from dagster import check from dagster.core.code_pointer import CodePointer from dagster.core.host_representation import ExternalPipelineOrigin, ExternalRepositoryOrigin from dagster.core.instance.ref import InstanceRef from dagster.core.origin import PipelinePythonOrigin from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @whitelist_for_serdes class ExecutionPlanSnapshotArgs( namedtuple( "_ExecutionPlanSnapshotArgs", "pipeline_origin solid_selection run_config mode step_keys_to_execute pipeline_snapshot_id", ) ): def __new__( cls, pipeline_origin, solid_selection, run_config, mode, step_keys_to_execute, pipeline_snapshot_id, ): return super(ExecutionPlanSnapshotArgs, cls).__new__( cls, pipeline_origin=check.inst_param( pipeline_origin, "pipeline_origin", ExternalPipelineOrigin ), solid_selection=check.opt_list_param(solid_selection, "solid_selection", of_type=str), run_config=check.dict_param(run_config, "run_config"), mode=check.str_param(mode, "mode"), step_keys_to_execute=check.opt_list_param( step_keys_to_execute, "step_keys_to_execute", of_type=str ), pipeline_snapshot_id=check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id"), ) @whitelist_for_serdes class ExecuteRunArgs(namedtuple("_ExecuteRunArgs", "pipeline_origin pipeline_run_id instance_ref")): def __new__(cls, pipeline_origin, pipeline_run_id, instance_ref): return super(ExecuteRunArgs, cls).__new__( cls, pipeline_origin=check.inst_param( pipeline_origin, "pipeline_origin", PipelinePythonOrigin, ), pipeline_run_id=check.str_param(pipeline_run_id, "pipeline_run_id"), instance_ref=check.opt_inst_param(instance_ref, "instance_ref", InstanceRef), ) @whitelist_for_serdes class ExecuteExternalPipelineArgs( namedtuple("_ExecuteExternalPipelineArgs", "pipeline_origin pipeline_run_id instance_ref") ): def __new__(cls, pipeline_origin, pipeline_run_id, instance_ref): return super(ExecuteExternalPipelineArgs, cls).__new__( cls, pipeline_origin=check.inst_param( pipeline_origin, "pipeline_origin", ExternalPipelineOrigin, ), pipeline_run_id=check.str_param(pipeline_run_id, "pipeline_run_id"), instance_ref=check.opt_inst_param(instance_ref, "instance_ref", InstanceRef), ) @whitelist_for_serdes class ExecuteStepArgs( namedtuple( "_ExecuteStepArgs", "pipeline_origin pipeline_run_id instance_ref mode step_keys_to_execute run_config retries_dict should_verify_step", ) ): def __new__( cls, pipeline_origin, pipeline_run_id, instance_ref=None, mode=None, step_keys_to_execute=None, run_config=None, retries_dict=None, should_verify_step=None, ): return super(ExecuteStepArgs, cls).__new__( cls, pipeline_origin=check.inst_param( pipeline_origin, "pipeline_origin", PipelinePythonOrigin ), pipeline_run_id=check.str_param(pipeline_run_id, "pipeline_run_id"), instance_ref=check.opt_inst_param(instance_ref, "instance_ref", InstanceRef), mode=check.opt_str_param(mode, "mode"), step_keys_to_execute=check.opt_list_param( step_keys_to_execute, "step_keys_to_execute", of_type=str ), run_config=check.opt_dict_param(run_config, "run_config"), retries_dict=check.opt_dict_param(retries_dict, "retries_dict"), should_verify_step=check.opt_bool_param( should_verify_step, "should_verify_step", False ), ) @whitelist_for_serdes class LoadableRepositorySymbol( namedtuple("_LoadableRepositorySymbol", "repository_name attribute") ): def __new__(cls, repository_name, attribute): return super(LoadableRepositorySymbol, cls).__new__( cls, repository_name=check.str_param(repository_name, "repository_name"), attribute=check.str_param(attribute, "attribute"), ) @whitelist_for_serdes class ListRepositoriesResponse( namedtuple( "_ListRepositoriesResponse", "repository_symbols executable_path repository_code_pointer_dict", ) ): def __new__( cls, repository_symbols, executable_path=None, repository_code_pointer_dict=None, ): return super(ListRepositoriesResponse, cls).__new__( cls, repository_symbols=check.list_param( repository_symbols, "repository_symbols", of_type=LoadableRepositorySymbol ), # These are currently only used by the GRPC Repository Location, but # we will need to migrate the rest of the repository locations to use this. executable_path=check.opt_str_param(executable_path, "executable_path"), repository_code_pointer_dict=check.opt_dict_param( repository_code_pointer_dict, "repository_code_pointer_dict", key_type=str, value_type=CodePointer, ), ) @whitelist_for_serdes class ListRepositoriesInput( namedtuple("_ListRepositoriesInput", "module_name python_file working_directory attribute") ): def __new__(cls, module_name, python_file, working_directory, attribute): check.invariant(not (module_name and python_file), "Must set only one") check.invariant(module_name or python_file, "Must set at least one") return super(ListRepositoriesInput, cls).__new__( cls, module_name=check.opt_str_param(module_name, "module_name"), python_file=check.opt_str_param(python_file, "python_file"), working_directory=check.opt_str_param(working_directory, "working_directory"), attribute=check.opt_str_param(attribute, "attribute"), ) @whitelist_for_serdes class PartitionArgs( namedtuple("_PartitionArgs", "repository_origin partition_set_name partition_name") ): def __new__(cls, repository_origin, partition_set_name, partition_name): return super(PartitionArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", ExternalRepositoryOrigin, ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), partition_name=check.str_param(partition_name, "partition_name"), ) @whitelist_for_serdes class PartitionNamesArgs(namedtuple("_PartitionNamesArgs", "repository_origin partition_set_name")): def __new__(cls, repository_origin, partition_set_name): return super(PartitionNamesArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", ExternalRepositoryOrigin ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), ) @whitelist_for_serdes class PartitionSetExecutionParamArgs( namedtuple( "_PartitionSetExecutionParamArgs", "repository_origin partition_set_name partition_names", ) ): def __new__(cls, repository_origin, partition_set_name, partition_names): return super(PartitionSetExecutionParamArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", ExternalRepositoryOrigin ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), partition_names=check.list_param(partition_names, "partition_names", of_type=str), ) @whitelist_for_serdes class PipelineSubsetSnapshotArgs( namedtuple("_PipelineSubsetSnapshotArgs", "pipeline_origin solid_selection") ): def __new__(cls, pipeline_origin, solid_selection): return super(PipelineSubsetSnapshotArgs, cls).__new__( cls, pipeline_origin=check.inst_param( pipeline_origin, "pipeline_origin", ExternalPipelineOrigin ), solid_selection=check.list_param(solid_selection, "solid_selection", of_type=str) if solid_selection else None, ) @whitelist_for_serdes class ExternalScheduleExecutionArgs( namedtuple( "_ExternalScheduleExecutionArgs", "repository_origin instance_ref schedule_name " "scheduled_execution_timestamp scheduled_execution_timezone", ) ): def __new__( cls, repository_origin, instance_ref, schedule_name, scheduled_execution_timestamp=None, scheduled_execution_timezone=None, ): return super(ExternalScheduleExecutionArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", ExternalRepositoryOrigin ), instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), schedule_name=check.str_param(schedule_name, "schedule_name"), scheduled_execution_timestamp=check.opt_float_param( scheduled_execution_timestamp, "scheduled_execution_timestamp" ), scheduled_execution_timezone=check.opt_str_param( scheduled_execution_timezone, "scheduled_execution_timezone", ), ) +@whitelist_for_serdes +class SensorExecutionArgs( + namedtuple( + "_SensorExecutionArgs", "repository_origin instance_ref sensor_name last_evaluation_time" + ) +): + def __new__(cls, repository_origin, instance_ref, sensor_name, last_evaluation_time): + return super(SensorExecutionArgs, cls).__new__( + cls, + repository_origin=check.inst_param( + repository_origin, "repository_origin", ExternalRepositoryOrigin + ), + instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), + sensor_name=check.str_param(sensor_name, "sensor_name"), + last_evaluation_time=check.opt_float_param( + last_evaluation_time, "last_evaluation_time" + ), + ) + + @whitelist_for_serdes class ExternalJobArgs(namedtuple("_ExternalJobArgs", "repository_origin instance_ref name",)): def __new__(cls, repository_origin, instance_ref, name): return super(ExternalJobArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", ExternalRepositoryOrigin ), instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), name=check.str_param(name, "name"), ) @whitelist_for_serdes class ShutdownServerResult(namedtuple("_ShutdownServerResult", "success serializable_error_info")): def __new__(cls, success, serializable_error_info): return super(ShutdownServerResult, cls).__new__( cls, success=check.bool_param(success, "success"), serializable_error_info=check.opt_inst_param( serializable_error_info, "serializable_error_info", SerializableErrorInfo ), ) @whitelist_for_serdes class CancelExecutionRequest(namedtuple("_CancelExecutionRequest", "run_id")): def __new__(cls, run_id): return super(CancelExecutionRequest, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), ) @whitelist_for_serdes class CancelExecutionResult( namedtuple("_CancelExecutionResult", "success message serializable_error_info") ): def __new__(cls, success, message, serializable_error_info): return super(CancelExecutionResult, cls).__new__( cls, success=check.bool_param(success, "success"), message=check.opt_str_param(message, "message"), serializable_error_info=check.opt_inst_param( serializable_error_info, "serializable_error_info", SerializableErrorInfo ), ) @whitelist_for_serdes class CanCancelExecutionRequest(namedtuple("_CanCancelExecutionRequest", "run_id")): def __new__(cls, run_id): return super(CanCancelExecutionRequest, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), ) @whitelist_for_serdes class CanCancelExecutionResult(namedtuple("_CancelExecutionResult", "can_cancel")): def __new__(cls, can_cancel): return super(CanCancelExecutionResult, cls).__new__( cls, can_cancel=check.bool_param(can_cancel, "can_cancel"), ) @whitelist_for_serdes class StartRunResult(namedtuple("_StartRunResult", "success message serializable_error_info")): def __new__(cls, success, message, serializable_error_info): return super(StartRunResult, cls).__new__( cls, success=check.bool_param(success, "success"), message=check.opt_str_param(message, "message"), serializable_error_info=check.opt_inst_param( serializable_error_info, "serializable_error_info", SerializableErrorInfo ), ) @whitelist_for_serdes class GetCurrentImageResult( namedtuple("_GetCurrentImageResult", "current_image serializable_error_info") ): def __new__(cls, current_image, serializable_error_info): return super(GetCurrentImageResult, cls).__new__( cls, current_image=check.opt_str_param(current_image, "current_image"), serializable_error_info=check.opt_inst_param( serializable_error_info, "serializable_error_info", SerializableErrorInfo ), ) diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py index 678a02273..2532652cf 100644 --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -1,144 +1,156 @@ import string from dagster import ( InputDefinition, Int, OutputDefinition, PartitionSetDefinition, ScheduleDefinition, lambda_solid, pipeline, repository, solid, usable_as_dagster_type, ) +from dagster.core.definitions.decorators.sensor import sensor @lambda_solid def do_something(): return 1 @lambda_solid def do_input(x): return x @pipeline(name="foo") def foo_pipeline(): do_input(do_something()) @pipeline(name="baz", description="Not much tbh") def baz_pipeline(): do_input() def define_foo_pipeline(): return foo_pipeline @pipeline(name="bar") def bar_pipeline(): @usable_as_dagster_type(name="InputTypeWithoutHydration") class InputTypeWithoutHydration(int): pass @solid(output_defs=[OutputDefinition(InputTypeWithoutHydration)]) def one(_): return 1 @solid( input_defs=[InputDefinition("some_input", InputTypeWithoutHydration)], output_defs=[OutputDefinition(Int)], ) def fail_subset(_, some_input): return some_input return fail_subset(one()) def define_bar_schedules(): return { "foo_schedule": ScheduleDefinition( "foo_schedule", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config={"fizz": "buzz"}, ), "foo_schedule_never_execute": ScheduleDefinition( "foo_schedule_never_execute", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config={"fizz": "buzz"}, should_execute=lambda _context: False, ), "foo_schedule_echo_time": ScheduleDefinition( "foo_schedule_echo_time", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config_fn=lambda context: { "passed_in_time": context.scheduled_execution_time.isoformat() if context.scheduled_execution_time else "" }, ), } def error_partition_fn(): raise Exception("womp womp") def error_partition_config_fn(): raise Exception("womp womp") def error_partition_tags_fn(_partition): raise Exception("womp womp") def define_baz_partitions(): return { "baz_partitions": PartitionSetDefinition( name="baz_partitions", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=lambda partition: { "solids": {"do_input": {"inputs": {"x": {"value": partition.value}}}} }, tags_fn_for_partition=lambda _partition: {"foo": "bar"}, ), "error_partitions": PartitionSetDefinition( name="error_partitions", pipeline_name="baz", partition_fn=error_partition_fn, run_config_fn_for_partition=lambda partition: {}, ), "error_partition_config": PartitionSetDefinition( name="error_partition_config", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=error_partition_config_fn, ), "error_partition_tags": PartitionSetDefinition( name="error_partition_tags", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=lambda partition: {}, tags_fn_for_partition=error_partition_tags_fn, ), } +@sensor(pipeline_name="foo_pipeline", run_config_fn=lambda _: {"foo": "FOO"}) +def sensor_foo(_): + return True + + +@sensor(pipeline_name="foo_pipeline") +def sensor_error(_): + raise Exception("womp womp") + + @repository def bar_repo(): return { "pipelines": { "foo": define_foo_pipeline, "bar": lambda: bar_pipeline, "baz": lambda: baz_pipeline, }, "schedules": define_bar_schedules(), "partition_sets": define_baz_partitions(), + "jobs": {"sensor_foo": sensor_foo, "sensor_error": lambda: sensor_error}, } diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py new file mode 100644 index 000000000..f9732cd1c --- /dev/null +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -0,0 +1,28 @@ +from dagster.api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc +from dagster.core.host_representation.external_data import ( + ExternalSensorExecutionData, + ExternalSensorExecutionErrorData, +) +from dagster.core.test_utils import instance_for_test + +from .utils import get_bar_repo_handle + + +def test_external_sensor_grpc(): + with get_bar_repo_handle() as repository_handle: + with instance_for_test() as instance: + result = sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_foo", None + ) + assert isinstance(result, ExternalSensorExecutionData) + assert result.run_config == {"foo": "FOO"} + + +def test_external_sensor_error(): + with get_bar_repo_handle() as repository_handle: + with instance_for_test() as instance: + result = sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_error", None + ) + assert isinstance(result, ExternalSensorExecutionErrorData) + assert "womp womp" in result.error.to_string()