diff --git a/python_modules/dagster/dagster/cli/sensor.py b/python_modules/dagster/dagster/cli/sensor.py index 04e00002b..3381d7ffa 100644 --- a/python_modules/dagster/dagster/cli/sensor.py +++ b/python_modules/dagster/dagster/cli/sensor.py @@ -1,318 +1,318 @@ from __future__ import print_function import os import click import six import yaml from dagster import DagsterInvariantViolationError, check from dagster.cli.workspace.cli_target import ( get_external_repository_from_kwargs, get_external_repository_from_repo_location, get_repository_location_from_kwargs, repository_target_argument, ) from dagster.core.definitions.job import JobType from dagster.core.host_representation import ExternalRepository from dagster.core.host_representation.external_data import ExternalSensorExecutionErrorData from dagster.core.instance import DagsterInstance from dagster.core.scheduler.job import JobState, JobStatus def create_sensor_cli_group(): group = click.Group(name="sensor") group.add_command(sensor_list_command) group.add_command(sensor_start_command) group.add_command(sensor_stop_command) group.add_command(sensor_preview_command) return group def print_changes(external_repository, instance, print_fn=print, preview=False): sensor_states = instance.all_stored_job_state( external_repository.get_origin_id(), JobType.SENSOR ) external_sensors = external_repository.get_external_sensors() external_sensors_dict = {s.get_external_origin_id(): s for s in external_sensors} sensor_states_dict = {s.job_origin_id: s for s in sensor_states} external_sensor_origin_ids = set(external_sensors_dict.keys()) sensor_state_ids = set(sensor_states_dict.keys()) added_sensors = external_sensor_origin_ids - sensor_state_ids removed_sensors = sensor_state_ids - external_sensor_origin_ids if not added_sensors and not removed_sensors: if preview: print_fn(click.style("No planned changes to sensors.", fg="magenta", bold=True)) print_fn("{num} sensors will remain unchanged".format(num=len(external_sensors))) else: print_fn(click.style("No changes to sensors.", fg="magenta", bold=True)) print_fn("{num} sensors unchanged".format(num=len(external_sensors))) return print_fn( click.style("Planned Sensor Changes:" if preview else "Changes:", fg="magenta", bold=True) ) for sensor_origin_id in added_sensors: print_fn( click.style( " + {name} (add) [{id}]".format( name=external_sensors_dict[sensor_origin_id].name, id=sensor_origin_id ), fg="green", ) ) for sensor_origin_id in removed_sensors: print_fn( click.style( " + {name} (delete) [{id}]".format( name=external_sensors_dict[sensor_origin_id].name, id=sensor_origin_id ), fg="red", ) ) def check_repo_and_scheduler(repository, instance): check.inst_param(repository, "repository", ExternalRepository) check.inst_param(instance, "instance", DagsterInstance) repository_name = repository.name if not repository.get_external_sensors(): raise click.UsageError( "There are no sensors defined for repository {name}.".format(name=repository_name) ) if not os.getenv("DAGSTER_HOME"): raise click.UsageError( ( "The environment variable $DAGSTER_HOME is not set. Dagster requires this " "environment variable to be set to an existing directory in your filesystem " "that contains your dagster instance configuration file (dagster.yaml).\n" "You can resolve this error by exporting the environment variable." "For example, you can run the following command in your shell or " "include it in your shell configuration file:\n" '\texport DAGSTER_HOME="~/dagster_home"' "\n\n" ) ) def extract_sensor_name(sensor_name): if sensor_name and not isinstance(sensor_name, six.string_types): if len(sensor_name) == 1: return sensor_name[0] else: check.failed( "Can only handle zero or one sensor args. Got {sensor_name}".format( sensor_name=repr(sensor_name) ) ) @click.command( name="list", help="List all sensors that correspond to a repository.", ) @repository_target_argument @click.option("--running", help="Filter for running sensors", is_flag=True, default=False) @click.option("--stopped", help="Filter for stopped sensors", is_flag=True, default=False) @click.option("--name", help="Only display sensor sensor names", is_flag=True, default=False) def sensor_list_command(running, stopped, name, **kwargs): return execute_list_command(running, stopped, name, kwargs, click.echo) def execute_list_command(running_filter, stopped_filter, name_filter, cli_args, print_fn): with DagsterInstance.get() as instance: with get_external_repository_from_kwargs(cli_args) as external_repo: check_repo_and_scheduler(external_repo, instance) repository_name = external_repo.name if not name_filter: title = "Repository {name}".format(name=repository_name) print_fn(title) print_fn("*" * len(title)) stored_sensors_by_id = {} for job_state in instance.all_stored_job_state( external_repo.get_external_origin_id(), JobType.SENSOR ): stored_sensors_by_id[job_state.job_origin_id] = job_state all_state = [ stored_sensors_by_id.get( external_sensor.get_external_origin_id(), external_sensor.get_default_sensor_state(), ) for external_sensor in external_repo.get_external_sensors() ] if running_filter: jobs = [ job_state for job_state in all_state if job_state.status == JobStatus.RUNNING ] elif stopped_filter: jobs = [ job_state for job_state in all_state if job_state.status == JobStatus.STOPPED ] else: jobs = all_state first = True for job_state in jobs: # If --name filter is present, only print the job name if name_filter: print_fn(job_state.job_name) continue flag = "[{status}]".format(status=job_state.status.value) if job_state else "" job_title = "Sensor: {name} {flag}".format(name=job_state.job_name, flag=flag) if not first: print_fn("*" * len(job_title)) first = False print_fn(job_title) def _add_or_update_job_state(instance, external_sensor, status): existing_job_state = instance.get_job_state(external_sensor.get_external_origin_id()) if not existing_job_state: instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, status) ) else: instance.update_job_state(existing_job_state.with_status(status)) @click.command(name="start", help="Start an existing sensor") @click.argument("sensor_name", nargs=-1) # , required=True) @click.option("--start-all", help="start all sensors", is_flag=True, default=False) @repository_target_argument def sensor_start_command(sensor_name, start_all, **kwargs): sensor_name = extract_sensor_name(sensor_name) if sensor_name is None and start_all is False: print( # pylint: disable=print-call "Noop: dagster sensor start was called without any arguments specifying which " "sensors to start. Pass a sensor name or the --start-all flag to start sensors." ) return return execute_start_command(sensor_name, start_all, kwargs, click.echo) def execute_start_command(sensor_name, all_flag, cli_args, print_fn): with DagsterInstance.get() as instance: with get_external_repository_from_kwargs(cli_args) as external_repo: check_repo_and_scheduler(external_repo, instance) repository_name = external_repo.name if all_flag: try: for external_sensor in external_repo.get_external_sensors(): _add_or_update_job_state(instance, external_sensor, JobStatus.RUNNING) print_fn( "Started all sensors for repository {repository_name}".format( repository_name=repository_name ) ) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) else: try: external_sensor = external_repo.get_external_sensor(sensor_name) _add_or_update_job_state(instance, external_sensor, JobStatus.RUNNING) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) print_fn("Started sensor {sensor_name}".format(sensor_name=sensor_name)) @click.command(name="stop", help="Stop an existing sensor") @click.argument("sensor_name", nargs=-1) @repository_target_argument def sensor_stop_command(sensor_name, **kwargs): sensor_name = extract_sensor_name(sensor_name) return execute_stop_command(sensor_name, kwargs, click.echo) def execute_stop_command(sensor_name, cli_args, print_fn, instance=None): with DagsterInstance.get() as instance: with get_external_repository_from_kwargs(cli_args) as external_repo: check_repo_and_scheduler(external_repo, instance) try: external_sensor = external_repo.get_external_sensor(sensor_name) _add_or_update_job_state(instance, external_sensor, JobStatus.STOPPED) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) print_fn("Stopped sensor {sensor_name}".format(sensor_name=sensor_name)) @click.command(name="preview", help="Preview an existing sensor execution") @click.argument("sensor_name", nargs=-1) @click.option( "--since", help="Set the last_completion_time value as a timestamp float for the sensor context", default=None, ) @repository_target_argument def sensor_preview_command(sensor_name, since, **kwargs): sensor_name = extract_sensor_name(sensor_name) if since: since = float(since) return execute_preview_command(sensor_name, since, kwargs, click.echo) def execute_preview_command(sensor_name, since, cli_args, print_fn, instance=None): with DagsterInstance.get() as instance: with get_repository_location_from_kwargs(cli_args) as repo_location: try: external_repo = get_external_repository_from_repo_location( repo_location, cli_args.get("repository") ) check_repo_and_scheduler(external_repo, instance) external_sensor = external_repo.get_external_sensor(sensor_name) sensor_runtime_data = repo_location.get_external_sensor_execution_data( instance, external_repo.handle, external_sensor.name, since, ) if isinstance(sensor_runtime_data, ExternalSensorExecutionErrorData): print_fn( "Failed to resolve sensor for {sensor_name} : {error_info}".format( sensor_name=external_sensor.name, error_info=sensor_runtime_data.error.to_string(), ) ) - elif not sensor_runtime_data.run_params: + elif not sensor_runtime_data.run_requests: if sensor_runtime_data.skip_message: print_fn( "Sensor returned false for {sensor_name}, skipping: {skip_message}".format( sensor_name=external_sensor.name, skip_message=sensor_runtime_data.skip_message, ) ) else: print_fn( "Sensor returned false for {sensor_name}, skipping".format( sensor_name=external_sensor.name ) ) else: print_fn( - "Sensor returning run parameters for {num} run(s):\n\n{run_params}".format( - num=len(sensor_runtime_data.run_params), - run_params="\n".join( - yaml.safe_dump(param.run_config, default_flow_style=False) - for param in sensor_runtime_data.run_params + "Sensor returning run requests for {num} run(s):\n\n{run_requests}".format( + num=len(sensor_runtime_data.run_requests), + run_requests="\n".join( + yaml.safe_dump(run_request.run_config, default_flow_style=False) + for run_request in sensor_runtime_data.run_requests ), ) ) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) sensor_cli = create_sensor_cli_group() diff --git a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py index 2f2bfc362..4cb754008 100644 --- a/python_modules/dagster/dagster/core/definitions/decorators/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/sensor.py @@ -1,63 +1,62 @@ import inspect from dagster import check -from dagster.core.definitions.sensor import SensorDefinition, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SensorDefinition, SkipReason from dagster.core.errors import DagsterInvariantViolationError from dagster.utils.backcompat import experimental @experimental def sensor(pipeline_name, name=None, solid_selection=None, mode=None): """ Creates a sensor where the decorated function is used as the sensor's evaluation function. The decorated function may: - 1. Return a `SensorRunParams` object. - 2. Yield a number of `SensorRunParams` objects. - 3. Return a `SensorSkipData` object, providing a descriptive message of why no runs were - requested. + 1. Return a `RunRequest` object. + 2. Yield multiple of `RunRequest` objects. + 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Yield nothing (skipping without providing a reason) Takes a :py:class:`~dagster.SensorExecutionContext`. Args: name (str): The name of this sensor solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute for runs for this sensor e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing runs for this sensor. (default: 'default') """ check.opt_str_param(name, "name") def inner(fn): check.callable_param(fn, "fn") sensor_name = name or fn.__name__ def _wrapped_fn(context): result = fn(context) if inspect.isgenerator(result): for item in result: yield item - elif isinstance(result, (SensorSkipData, SensorRunParams)): + elif isinstance(result, (SkipReason, RunRequest)): yield result elif result is not None: raise DagsterInvariantViolationError( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " - "{result} of type {type_}. Should only return SensorSkipData or " - "SensorRunParams objects." + "{result} of type {type_}. Should only return SkipReason or " + "RunRequest objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) return SensorDefinition( name=sensor_name, pipeline_name=pipeline_name, evaluation_fn=_wrapped_fn, solid_selection=solid_selection, mode=mode, ) return inner diff --git a/python_modules/dagster/dagster/core/definitions/job.py b/python_modules/dagster/dagster/core/definitions/job.py index 4d86c92f5..029cb2aa0 100644 --- a/python_modules/dagster/dagster/core/definitions/job.py +++ b/python_modules/dagster/dagster/core/definitions/job.py @@ -1,88 +1,89 @@ from enum import Enum from dagster import check -from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from .mode import DEFAULT_MODE_NAME from .utils import check_valid_name @whitelist_for_serdes class JobType(Enum): SCHEDULE = "SCHEDULE" SENSOR = "SENSOR" class JobContext: """Context for generating the execution parameters for an JobDefinition at runtime. An instance of this class is made available as the first argument to the JobDefinition functions: run_config_fn, tags_fn Attributes: instance (DagsterInstance): The instance configured to launch the job """ __slots__ = ["_instance"] def __init__(self, instance): + from dagster.core.instance import DagsterInstance + self._instance = check.inst_param(instance, "instance", DagsterInstance) @property def instance(self): return self._instance class JobDefinition: """Defines a job, which describes a series of runs for a particular pipeline. These runs are grouped by job_name, using tags. Args: name (str): The name of this job. pipeline_name (str): The name of the pipeline to execute. mode (Optional[str]): The mode to apply when executing this pipeline. (default: 'default') solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute. e.g. ``['*some_solid+', 'other_solid']`` """ __slots__ = [ "_name", "_job_type", "_pipeline_name", "_tags_fn", "_run_config_fn", "_mode", "_solid_selection", ] def __init__( self, name, job_type, pipeline_name, mode="default", solid_selection=None, ): self._name = check_valid_name(name) self._job_type = check.inst_param(job_type, "job_type", JobType) self._pipeline_name = check.str_param(pipeline_name, "pipeline_name") self._mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) self._solid_selection = check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ) @property def name(self): return self._name @property def pipeline_name(self): return self._pipeline_name @property def job_type(self): return self._job_type @property def solid_selection(self): return self._solid_selection @property def mode(self): return self._mode diff --git a/python_modules/dagster/dagster/core/definitions/sensor.py b/python_modules/dagster/dagster/core/definitions/sensor.py index a4e1a046b..9a030d170 100644 --- a/python_modules/dagster/dagster/core/definitions/sensor.py +++ b/python_modules/dagster/dagster/core/definitions/sensor.py @@ -1,116 +1,116 @@ from collections import namedtuple from dagster import check from dagster.core.definitions.job import JobContext, JobDefinition, JobType from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from dagster.utils import ensure_gen from dagster.utils.backcompat import experimental_class_warning -@whitelist_for_serdes -class SensorSkipData(namedtuple("_SensorSkipData", "skip_message")): - def __new__(cls, skip_message=None): - return super(SensorSkipData, cls).__new__( - cls, skip_message=check.opt_str_param(skip_message, "skip_message") - ) - - -@whitelist_for_serdes -class SensorRunParams(namedtuple("_SensorRunParams", "execution_key run_config tags")): - """ - Represents all the information required to launch a single run instigated by a sensor body. - Must be returned by a SensorDefinition's evaluation function for a run to be launched. - - Attributes: - execution_key (str | None): A string key to identify this launched run. The sensor will - ensure that exactly one run is created for each execution key, and will not create - another run if the same execution key is requested in a later evaluation. Passing in - a `None` value means that the sensor will attempt to create and launch every run - requested for every sensor evaluation. - run_config (Optional[Dict]): The environment config that parameterizes the run execution to - be launched, as a dict. - tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach - to the launched run. - """ - - def __new__(cls, execution_key, run_config=None, tags=None): - return super(SensorRunParams, cls).__new__( - cls, - run_config=check.opt_dict_param(run_config, "run_config"), - tags=check.opt_dict_param(tags, "tags"), - execution_key=check.opt_str_param(execution_key, "execution_key"), - ) - - class SensorExecutionContext(JobContext): """Sensor execution context. An instance of this class is made available as the first argument to the evaluation function on SensorDefinition. Attributes: instance (DagsterInstance): The instance configured to run the schedule last_completion_time (float): The last time that the sensor was evaluated (UTC). """ __slots__ = ["_last_completion_time"] def __init__(self, instance, last_completion_time): super(SensorExecutionContext, self).__init__( check.inst_param(instance, "instance", DagsterInstance), ) self._last_completion_time = check.opt_float_param( last_completion_time, "last_completion_time" ) @property def last_completion_time(self): return self._last_completion_time +@whitelist_for_serdes +class SkipReason(namedtuple("_SkipReason", "skip_message")): + def __new__(cls, skip_message=None): + return super(SkipReason, cls).__new__( + cls, skip_message=check.opt_str_param(skip_message, "skip_message") + ) + + +@whitelist_for_serdes +class RunRequest(namedtuple("_RunRequest", "run_key run_config tags")): + """ + Represents all the information required to launch a single run instigated by a sensor body. + Must be returned by a SensorDefinition's evaluation function for a run to be launched. + + Attributes: + run_key (str | None): A string key to identify this launched run. The sensor will + ensure that exactly one run is created for each run key, and will not create + another run if the same run key is requested in a later evaluation. Passing in + a `None` value means that the sensor will attempt to create and launch every run + requested for every sensor evaluation. + run_config (Optional[Dict]): The environment config that parameterizes the run execution to + be launched, as a dict. + tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach + to the launched run. + """ + + def __new__(cls, run_key, run_config=None, tags=None): + return super(RunRequest, cls).__new__( + cls, + run_key=check.opt_str_param(run_key, "run_key"), + run_config=check.opt_dict_param(run_config, "run_config"), + tags=check.opt_dict_param(tags, "tags"), + ) + + class SensorDefinition(JobDefinition): """Define a sensor that initiates a set of job runs Args: name (str): The name of the sensor to create. pipeline_name (str): The name of the pipeline to execute when the sensor fires. evaluation_fn (Callable[[SensorExecutionContext]]): The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorExecutionContext`. - This function must return a generator, which must yield either a single SensorSkipData - or one or more SensorRunParams objects. + This function must return a generator, which must yield either a single SkipReason + or one or more RunRequest objects. solid_selection (Optional[List[str]]): A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ``['*some_solid+', 'other_solid']`` mode (Optional[str]): The mode to apply when executing this sensor. (default: 'default') """ __slots__ = [ "_evaluation_fn", ] def __init__( self, name, pipeline_name, evaluation_fn, solid_selection=None, mode=None, ): experimental_class_warning("SensorDefinition") super(SensorDefinition, self).__init__( name, job_type=JobType.SENSOR, pipeline_name=pipeline_name, mode=mode, solid_selection=solid_selection, ) self._evaluation_fn = check.callable_param(evaluation_fn, "evaluation_fn") def get_tick_data(self, context): check.inst_param(context, "context", SensorExecutionContext) result = list(ensure_gen(self._evaluation_fn(context))) if not result or result == [None]: return [] if len(result) == 1: - return check.is_list(result, of_type=(SensorRunParams, SensorSkipData)) + return check.is_list(result, of_type=(RunRequest, SkipReason)) - return check.is_list(result, of_type=SensorRunParams) + return check.is_list(result, of_type=RunRequest) diff --git a/python_modules/dagster/dagster/core/host_representation/external_data.py b/python_modules/dagster/dagster/core/host_representation/external_data.py index da4bd609b..0bbfba8c7 100644 --- a/python_modules/dagster/dagster/core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/core/host_representation/external_data.py @@ -1,434 +1,434 @@ """ This module contains data objects meant to be serialized between host processes and user processes. They should contain no business logic or clever indexing. Use the classes in external.py for that. """ from collections import namedtuple from dagster import check from dagster.core.definitions import ( JobDefinition, JobType, PartitionSetDefinition, PipelineDefinition, PresetDefinition, RepositoryDefinition, ScheduleDefinition, ) from dagster.core.definitions.partition import PartitionScheduleDefinition -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest from dagster.core.snap import PipelineSnapshot from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @whitelist_for_serdes class ExternalRepositoryData( namedtuple( "_ExternalRepositoryData", "name external_pipeline_datas external_schedule_datas external_partition_set_datas external_executable_datas external_job_datas", ) ): def __new__( cls, name, external_pipeline_datas, external_schedule_datas, external_partition_set_datas, external_executable_datas=None, external_job_datas=None, ): return super(ExternalRepositoryData, cls).__new__( cls, name=check.str_param(name, "name"), external_pipeline_datas=check.list_param( external_pipeline_datas, "external_pipeline_datas", of_type=ExternalPipelineData ), external_schedule_datas=check.list_param( external_schedule_datas, "external_schedule_datas", of_type=ExternalScheduleData ), external_partition_set_datas=check.list_param( external_partition_set_datas, "external_partition_set_datas", of_type=ExternalPartitionSetData, ), external_executable_datas=check.opt_list_param( external_executable_datas, "external_executable_datas" ), external_job_datas=check.opt_list_param( external_job_datas, "external_job_datas", of_type=ExternalJobData, ), ) def get_pipeline_snapshot(self, name): check.str_param(name, "name") for external_pipeline_data in self.external_pipeline_datas: if external_pipeline_data.name == name: return external_pipeline_data.pipeline_snapshot check.failed("Could not find pipeline snapshot named " + name) def get_external_pipeline_data(self, name): check.str_param(name, "name") for external_pipeline_data in self.external_pipeline_datas: if external_pipeline_data.name == name: return external_pipeline_data check.failed("Could not find external pipeline data named " + name) def get_external_schedule_data(self, name): check.str_param(name, "name") for external_schedule_data in self.external_schedule_datas: if external_schedule_data.name == name: return external_schedule_data check.failed("Could not find external schedule data named " + name) def get_external_partition_set_data(self, name): check.str_param(name, "name") for external_partition_set_data in self.external_partition_set_datas: if external_partition_set_data.name == name: return external_partition_set_data check.failed("Could not find external partition set data named " + name) def get_external_job_data(self, name): check.str_param(name, "name") for external_job_data in self.external_job_datas: if external_job_data.name == name: return external_job_data check.failed("Could not find job data named " + name) @whitelist_for_serdes class ExternalPipelineSubsetResult( namedtuple("_ExternalPipelineSubsetResult", "success error external_pipeline_data") ): def __new__(cls, success, error=None, external_pipeline_data=None): return super(ExternalPipelineSubsetResult, cls).__new__( cls, success=check.bool_param(success, "success"), error=check.opt_inst_param(error, "error", SerializableErrorInfo), external_pipeline_data=check.opt_inst_param( external_pipeline_data, "external_pipeline_data", ExternalPipelineData ), ) @whitelist_for_serdes class ExternalPipelineData( namedtuple( "_ExternalPipelineData", "name pipeline_snapshot active_presets parent_pipeline_snapshot" ) ): def __new__(cls, name, pipeline_snapshot, active_presets, parent_pipeline_snapshot): return super(ExternalPipelineData, cls).__new__( cls, name=check.str_param(name, "name"), pipeline_snapshot=check.inst_param( pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot ), parent_pipeline_snapshot=check.opt_inst_param( parent_pipeline_snapshot, "parent_pipeline_snapshot", PipelineSnapshot ), active_presets=check.list_param( active_presets, "active_presets", of_type=ExternalPresetData ), ) @whitelist_for_serdes class ExternalPresetData( namedtuple("_ExternalPresetData", "name run_config solid_selection mode tags") ): def __new__(cls, name, run_config, solid_selection, mode, tags): return super(ExternalPresetData, cls).__new__( cls, name=check.str_param(name, "name"), run_config=check.opt_dict_param(run_config, "run_config"), solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), mode=check.str_param(mode, "mode"), tags=check.opt_dict_param(tags, "tags"), ) @whitelist_for_serdes class ExternalScheduleData( namedtuple( "_ExternalScheduleData", "name cron_schedule pipeline_name solid_selection mode environment_vars partition_set_name execution_timezone", ) ): def __new__( cls, name, cron_schedule, pipeline_name, solid_selection, mode, environment_vars, partition_set_name, execution_timezone, ): return super(ExternalScheduleData, cls).__new__( cls, name=check.str_param(name, "name"), cron_schedule=check.str_param(cron_schedule, "cron_schedule"), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), environment_vars=check.opt_dict_param(environment_vars, "environment_vars"), partition_set_name=check.opt_str_param(partition_set_name, "partition_set_name"), execution_timezone=check.opt_str_param(execution_timezone, "execution_timezone"), ) @whitelist_for_serdes class ExternalScheduleExecutionData( namedtuple("_ExternalScheduleExecutionData", "run_config tags should_execute") ): def __new__(cls, run_config=None, tags=None, should_execute=None): return super(ExternalScheduleExecutionData, cls).__new__( cls, run_config=check.opt_dict_param(run_config, "run_config"), tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), should_execute=check.opt_bool_param(should_execute, "should_execute"), ) @whitelist_for_serdes class ExternalScheduleExecutionErrorData( namedtuple("_ExternalScheduleExecutionErrorData", "error") ): def __new__(cls, error): return super(ExternalScheduleExecutionErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class ExternalJobData( namedtuple("_ExternalJobData", "name job_type pipeline_name solid_selection mode") ): def __new__( cls, name, job_type, pipeline_name, solid_selection, mode, ): return super(ExternalJobData, cls).__new__( cls, name=check.str_param(name, "name"), job_type=check.inst_param(job_type, "job_type", JobType), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), ) @whitelist_for_serdes class ExternalSensorExecutionData( - namedtuple("_ExternalSensorExecutionData", "run_params skip_message") + namedtuple("_ExternalSensorExecutionData", "run_requests skip_message") ): - def __new__(cls, run_params=None, skip_message=None): - check.opt_list_param(run_params, "run_params", SensorRunParams) + def __new__(cls, run_requests=None, skip_message=None): + check.opt_list_param(run_requests, "run_requests", RunRequest) check.opt_str_param(skip_message, "skip_message") check.invariant( - not (run_params and skip_message), "Found both skip data and run request data" + not (run_requests and skip_message), "Found both skip data and run request data" ) return super(ExternalSensorExecutionData, cls).__new__( - cls, run_params=run_params, skip_message=skip_message, + cls, run_requests=run_requests, skip_message=skip_message, ) @whitelist_for_serdes class ExternalSensorExecutionErrorData(namedtuple("_ExternalSensorExecutionErrorData", "error")): def __new__(cls, error): return super(ExternalSensorExecutionErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class ExternalExecutionParamsData(namedtuple("_ExternalExecutionParamsData", "run_config tags")): def __new__(cls, run_config=None, tags=None): return super(ExternalExecutionParamsData, cls).__new__( cls, run_config=check.opt_dict_param(run_config, "run_config"), tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str), ) @whitelist_for_serdes class ExternalExecutionParamsErrorData(namedtuple("_ExternalExecutionParamsErrorData", "error")): def __new__(cls, error): return super(ExternalExecutionParamsErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class ExternalPartitionSetData( namedtuple("_ExternalPartitionSetData", "name pipeline_name solid_selection mode") ): def __new__(cls, name, pipeline_name, solid_selection, mode): return super(ExternalPartitionSetData, cls).__new__( cls, name=check.str_param(name, "name"), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), solid_selection=check.opt_nullable_list_param(solid_selection, "solid_selection", str), mode=check.opt_str_param(mode, "mode"), ) @whitelist_for_serdes class ExternalPartitionNamesData(namedtuple("_ExternalPartitionNamesData", "partition_names")): def __new__(cls, partition_names=None): return super(ExternalPartitionNamesData, cls).__new__( cls, partition_names=check.opt_list_param(partition_names, "partition_names", str), ) @whitelist_for_serdes class ExternalPartitionConfigData(namedtuple("_ExternalPartitionConfigData", "name run_config")): def __new__(cls, name, run_config=None): return super(ExternalPartitionConfigData, cls).__new__( cls, name=check.str_param(name, "name"), run_config=check.opt_dict_param(run_config, "run_config"), ) @whitelist_for_serdes class ExternalPartitionTagsData(namedtuple("_ExternalPartitionTagsData", "name tags")): def __new__(cls, name, tags=None): return super(ExternalPartitionTagsData, cls).__new__( cls, name=check.str_param(name, "name"), tags=check.opt_dict_param(tags, "tags"), ) @whitelist_for_serdes class ExternalPartitionExecutionParamData( namedtuple("_ExternalPartitionExecutionParamData", "name tags run_config") ): def __new__(cls, name, tags, run_config): return super(ExternalPartitionExecutionParamData, cls).__new__( cls, name=check.str_param(name, "name"), tags=check.dict_param(tags, "tags"), run_config=check.opt_dict_param(run_config, "run_config"), ) @whitelist_for_serdes class ExternalPartitionSetExecutionParamData( namedtuple("_ExternalPartitionSetExecutionParamData", "partition_data") ): def __new__(cls, partition_data): return super(ExternalPartitionSetExecutionParamData, cls).__new__( cls, partition_data=check.list_param( partition_data, "partition_data", of_type=ExternalPartitionExecutionParamData ), ) @whitelist_for_serdes class ExternalPartitionExecutionErrorData( namedtuple("_ExternalPartitionExecutionErrorData", "error") ): def __new__(cls, error): return super(ExternalPartitionExecutionErrorData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), ) def external_repository_data_from_def(repository_def): check.inst_param(repository_def, "repository_def", RepositoryDefinition) return ExternalRepositoryData( name=repository_def.name, external_pipeline_datas=sorted( list(map(external_pipeline_data_from_def, repository_def.get_all_pipelines())), key=lambda pd: pd.name, ), external_schedule_datas=sorted( list(map(external_schedule_data_from_def, repository_def.schedule_defs)), key=lambda sd: sd.name, ), external_partition_set_datas=sorted( list(map(external_partition_set_data_from_def, repository_def.partition_set_defs)), key=lambda psd: psd.name, ), external_job_datas=sorted( list(map(external_job_from_def, repository_def.job_defs)), key=lambda job: job.name, ), ) def external_pipeline_data_from_def(pipeline_def): check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) return ExternalPipelineData( name=pipeline_def.name, pipeline_snapshot=pipeline_def.get_pipeline_snapshot(), parent_pipeline_snapshot=pipeline_def.get_parent_pipeline_snapshot(), active_presets=sorted( list(map(external_preset_data_from_def, pipeline_def.preset_defs)), key=lambda pd: pd.name, ), ) def external_schedule_data_from_def(schedule_def): check.inst_param(schedule_def, "schedule_def", ScheduleDefinition) return ExternalScheduleData( name=schedule_def.name, cron_schedule=schedule_def.cron_schedule, pipeline_name=schedule_def.pipeline_name, solid_selection=schedule_def.solid_selection, mode=schedule_def.mode, environment_vars=schedule_def.environment_vars, partition_set_name=schedule_def.get_partition_set().name if isinstance(schedule_def, PartitionScheduleDefinition) else None, execution_timezone=schedule_def.execution_timezone, ) def external_partition_set_data_from_def(partition_set_def): check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) return ExternalPartitionSetData( name=partition_set_def.name, pipeline_name=partition_set_def.pipeline_name, solid_selection=partition_set_def.solid_selection, mode=partition_set_def.mode, ) def external_job_from_def(job_def): check.inst_param(job_def, "job_def", JobDefinition) return ExternalJobData( name=job_def.name, job_type=job_def.job_type, pipeline_name=job_def.pipeline_name, solid_selection=job_def.solid_selection, mode=job_def.mode, ) def external_preset_data_from_def(preset_def): check.inst_param(preset_def, "preset_def", PresetDefinition) return ExternalPresetData( name=preset_def.name, run_config=preset_def.run_config, solid_selection=preset_def.solid_selection, mode=preset_def.mode, tags=preset_def.tags, ) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py index e0dcf998e..eed4d6863 100644 --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -1,1294 +1,1294 @@ import logging import os import sys import time import warnings from collections import defaultdict from enum import Enum import yaml from dagster import check, seven from dagster.core.definitions.events import AssetKey from dagster.core.definitions.pipeline import PipelineDefinition, PipelineSubsetDefinition from dagster.core.errors import ( DagsterInvariantViolationError, DagsterRunAlreadyExists, DagsterRunConflict, ) from dagster.core.execution.resolve_versions import resolve_step_output_versions from dagster.core.storage.migration.utils import upgrading_instance from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus from dagster.core.storage.tags import MEMOIZED_RUN_TAG from dagster.core.system_config.objects import EnvironmentConfig from dagster.core.utils import str_format_list from dagster.serdes import ConfigurableClass from dagster.seven import get_current_datetime_in_utc from dagster.utils.error import serializable_error_info_from_exc_info from .config import DAGSTER_CONFIG_YAML_FILENAME from .ref import InstanceRef # 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the # airflow ingestion logic (see: dagster_pipeline_factory.py). 'airflow_execution_date' stores the # 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines # whether 'airflow_execution_date' is needed. # https://github.com/dagster-io/dagster/issues/2403 AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date" IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline" def _is_dagster_home_set(): return bool(os.getenv("DAGSTER_HOME")) def is_memoized_run(tags): return tags is not None and MEMOIZED_RUN_TAG in tags and tags.get(MEMOIZED_RUN_TAG) == "true" def _dagster_home(): dagster_home_path = os.getenv("DAGSTER_HOME") if not dagster_home_path: raise DagsterInvariantViolationError( ( "The environment variable $DAGSTER_HOME is not set. Dagster requires this " "environment variable to be set to an existing directory in your filesystem " "that contains your dagster instance configuration file (dagster.yaml).\n" "You can resolve this error by exporting the environment variable." "For example, you can run the following command in your shell or " "include it in your shell configuration file:\n" '\texport DAGSTER_HOME="~/dagster_home"' ) ) dagster_home_path = os.path.expanduser(dagster_home_path) if not os.path.isabs(dagster_home_path): raise DagsterInvariantViolationError( ( '$DAGSTER_HOME "{}" must be an absolute path. Dagster requires this ' "environment variable to be set to an existing directory in your filesystem that" "contains your dagster instance configuration file (dagster.yaml)." ).format(dagster_home_path) ) if not (os.path.exists(dagster_home_path) and os.path.isdir(dagster_home_path)): raise DagsterInvariantViolationError( ( '$DAGSTER_HOME "{}" is not a directory or does not exist. Dagster requires this ' "environment variable to be set to an existing directory in your filesystem that " "contains your dagster instance configuration file (dagster.yaml)." ).format(dagster_home_path) ) return dagster_home_path def _check_run_equality(pipeline_run, candidate_run): check.inst_param(pipeline_run, "pipeline_run", PipelineRun) check.inst_param(candidate_run, "candidate_run", PipelineRun) field_diff = {} for field in pipeline_run._fields: expected_value = getattr(pipeline_run, field) candidate_value = getattr(candidate_run, field) if expected_value != candidate_value: field_diff[field] = (expected_value, candidate_value) return field_diff def _format_field_diff(field_diff): return "\n".join( [ ( " {field_name}:\n" + " Expected: {expected_value}\n" + " Received: {candidate_value}" ).format( field_name=field_name, expected_value=expected_value, candidate_value=candidate_value, ) for field_name, (expected_value, candidate_value,) in field_diff.items() ] ) class _EventListenerLogHandler(logging.Handler): def __init__(self, instance): self._instance = instance super(_EventListenerLogHandler, self).__init__() def emit(self, record): from dagster.core.events.log import construct_event_record, StructuredLoggerMessage try: event = construct_event_record( StructuredLoggerMessage( name=record.name, message=record.msg, level=record.levelno, meta=record.dagster_meta, record=record, ) ) self._instance.handle_new_event(event) except Exception as e: # pylint: disable=W0703 logging.critical("Error during instance event listen") logging.exception(str(e)) raise class InstanceType(Enum): PERSISTENT = "PERSISTENT" EPHEMERAL = "EPHEMERAL" class DagsterInstance: """Core abstraction for managing Dagster's access to storage and other resources. Use DagsterInstance.get() to grab the current DagsterInstance which will load based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME`` if set, otherwise fallback to using an ephemeral in-memory set of components. Configuration of this class should be done by setting values in ``$DAGSTER_HOME/dagster.yaml``. For example, to use Postgres for run and event log storage, you can write a ``dagster.yaml`` such as the following: .. literalinclude:: ../../../../docs/sections/deploying/postgres_dagster.yaml :caption: dagster.yaml :language: YAML Args: instance_type (InstanceType): Indicates whether the instance is ephemeral or persistent. Users should not attempt to set this value directly or in their ``dagster.yaml`` files. local_artifact_storage (LocalArtifactStorage): The local artifact storage is used to configure storage for any artifacts that require a local disk, such as schedules, or when using the filesystem system storage to manage files and intermediates. By default, this will be a :py:class:`dagster.core.storage.root.LocalArtifactStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. run_storage (RunStorage): The run storage is used to store metadata about ongoing and past pipeline runs. By default, this will be a :py:class:`dagster.core.storage.runs.SqliteRunStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. event_storage (EventLogStorage): Used to store the structured event logs generated by pipeline runs. By default, this will be a :py:class:`dagster.core.storage.event_log.SqliteEventLogStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. compute_log_manager (ComputeLogManager): The compute log manager handles stdout and stderr logging for solid compute functions. By default, this will be a :py:class:`dagster.core.storage.local_compute_log_manager.LocalComputeLogManager`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. run_coordinator (RunCoordinator): A runs coordinator may be used to manage the execution of pipeline runs. run_launcher (Optional[RunLauncher]): Optionally, a run launcher may be used to enable a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in addition to running them locally. settings (Optional[Dict]): Specifies certain per-instance settings, such as feature flags. These are set in the ``dagster.yaml`` under a set of whitelisted keys. ref (Optional[InstanceRef]): Used by internal machinery to pass instances across process boundaries. """ _PROCESS_TEMPDIR = None def __init__( self, instance_type, local_artifact_storage, run_storage, event_storage, compute_log_manager, schedule_storage=None, scheduler=None, run_coordinator=None, run_launcher=None, settings=None, ref=None, ): from dagster.core.storage.compute_log_manager import ComputeLogManager from dagster.core.storage.event_log import EventLogStorage from dagster.core.storage.root import LocalArtifactStorage from dagster.core.storage.runs import RunStorage from dagster.core.storage.schedules import ScheduleStorage from dagster.core.scheduler import Scheduler from dagster.core.run_coordinator import RunCoordinator from dagster.core.launcher import RunLauncher self._instance_type = check.inst_param(instance_type, "instance_type", InstanceType) self._local_artifact_storage = check.inst_param( local_artifact_storage, "local_artifact_storage", LocalArtifactStorage ) self._event_storage = check.inst_param(event_storage, "event_storage", EventLogStorage) self._run_storage = check.inst_param(run_storage, "run_storage", RunStorage) self._compute_log_manager = check.inst_param( compute_log_manager, "compute_log_manager", ComputeLogManager ) self._schedule_storage = check.opt_inst_param( schedule_storage, "schedule_storage", ScheduleStorage ) self._scheduler = check.opt_inst_param(scheduler, "scheduler", Scheduler) self._run_coordinator = check.inst_param(run_coordinator, "run_coordinator", RunCoordinator) self._run_coordinator.initialize(self) self._run_launcher = check.inst_param(run_launcher, "run_launcher", RunLauncher) self._run_launcher.initialize(self) self._settings = check.opt_dict_param(settings, "settings") self._ref = check.opt_inst_param(ref, "ref", InstanceRef) self._subscribers = defaultdict(list) # ctors @staticmethod def ephemeral(tempdir=None, preload=None): from dagster.core.run_coordinator import DefaultRunCoordinator from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster.core.storage.event_log import InMemoryEventLogStorage from dagster.core.storage.root import LocalArtifactStorage from dagster.core.storage.runs import InMemoryRunStorage from dagster.core.storage.noop_compute_log_manager import NoOpComputeLogManager if tempdir is None: tempdir = DagsterInstance.temp_storage() return DagsterInstance( InstanceType.EPHEMERAL, local_artifact_storage=LocalArtifactStorage(tempdir), run_storage=InMemoryRunStorage(preload=preload), event_storage=InMemoryEventLogStorage(preload=preload), compute_log_manager=NoOpComputeLogManager(), run_coordinator=DefaultRunCoordinator(), run_launcher=SyncInMemoryRunLauncher(), ) @staticmethod def get(fallback_storage=None): # 1. Use $DAGSTER_HOME to determine instance if set. if _is_dagster_home_set(): return DagsterInstance.from_config(_dagster_home()) # 2. If that is not set use the fallback storage directory if provided. # This allows us to have a nice out of the box dagit experience where runs are persisted # across restarts in a tempdir that gets cleaned up when the dagit watchdog process exits. elif fallback_storage is not None: return DagsterInstance.from_config(fallback_storage) # 3. If all else fails create an ephemeral in memory instance. else: return DagsterInstance.ephemeral(fallback_storage) @staticmethod def local_temp(tempdir=None, overrides=None): warnings.warn( "To create a local DagsterInstance for a test, use the instance_for_test " "context manager instead, which ensures that resoures are cleaned up afterwards" ) if tempdir is None: tempdir = DagsterInstance.temp_storage() return DagsterInstance.from_ref(InstanceRef.from_dir(tempdir, overrides=overrides)) @staticmethod def from_config(config_dir, config_filename=DAGSTER_CONFIG_YAML_FILENAME): instance_ref = InstanceRef.from_dir(config_dir, config_filename=config_filename) return DagsterInstance.from_ref(instance_ref) @staticmethod def from_ref(instance_ref): check.inst_param(instance_ref, "instance_ref", InstanceRef) return DagsterInstance( instance_type=InstanceType.PERSISTENT, local_artifact_storage=instance_ref.local_artifact_storage, run_storage=instance_ref.run_storage, event_storage=instance_ref.event_storage, compute_log_manager=instance_ref.compute_log_manager, schedule_storage=instance_ref.schedule_storage, scheduler=instance_ref.scheduler, run_coordinator=instance_ref.run_coordinator, run_launcher=instance_ref.run_launcher, settings=instance_ref.settings, ref=instance_ref, ) # flags @property def is_persistent(self): return self._instance_type == InstanceType.PERSISTENT @property def is_ephemeral(self): return self._instance_type == InstanceType.EPHEMERAL def get_ref(self): if self._ref: return self._ref check.failed( "Attempted to prepare an ineligible DagsterInstance ({inst_type}) for cross " "process communication.{dagster_home_msg}".format( inst_type=self._instance_type, dagster_home_msg="\nDAGSTER_HOME environment variable is not set, set it to " "a directory on the filesystem for dagster to use for storage and cross " "process coordination." if os.getenv("DAGSTER_HOME") is None else "", ) ) @property def root_directory(self): return self._local_artifact_storage.base_dir @staticmethod def temp_storage(): if DagsterInstance._PROCESS_TEMPDIR is None: DagsterInstance._PROCESS_TEMPDIR = seven.TemporaryDirectory() return DagsterInstance._PROCESS_TEMPDIR.name def _info(self, component): prefix = " " # ConfigurableClass may not have inst_data if it's a direct instantiation # which happens for ephemeral instances if isinstance(component, ConfigurableClass) and component.inst_data: return component.inst_data.info_str(prefix) if type(component) is dict: return prefix + yaml.dump(component, default_flow_style=False).replace( "\n", "\n" + prefix ) return "{}{}\n".format(prefix, component.__class__.__name__) def info_str_for_component(self, component_name, component): return "{component_name}:\n{component}\n".format( component_name=component_name, component=self._info(component) ) def info_str(self): settings = self._settings if self._settings else {} return ( "local_artifact_storage:\n{artifact}\n" "run_storage:\n{run}\n" "event_log_storage:\n{event}\n" "compute_logs:\n{compute}\n" "schedule_storage:\n{schedule_storage}\n" "scheduler:\n{scheduler}\n" "run_coordinator:\n{run_coordinator}\n" "run_launcher:\n{run_launcher}\n" "".format( artifact=self._info(self._local_artifact_storage), run=self._info(self._run_storage), event=self._info(self._event_storage), compute=self._info(self._compute_log_manager), schedule_storage=self._info(self._schedule_storage), scheduler=self._info(self._scheduler), run_coordinator=self._info(self._run_coordinator), run_launcher=self._info(self._run_launcher), ) + "\n".join( [ "{settings_key}:\n{settings_value}".format( settings_key=settings_key, settings_value=self._info(settings_value) ) for settings_key, settings_value in settings.items() ] ) ) # schedule storage @property def schedule_storage(self): return self._schedule_storage # schedule storage @property def scheduler(self): return self._scheduler # run coordinator @property def run_coordinator(self): return self._run_coordinator # run launcher @property def run_launcher(self): return self._run_launcher # compute logs @property def compute_log_manager(self): return self._compute_log_manager def get_settings(self, settings_key): check.str_param(settings_key, "settings_key") if self._settings and settings_key in self._settings: return self._settings.get(settings_key) return {} @property def telemetry_enabled(self): if self.is_ephemeral: return False dagster_telemetry_enabled_default = True telemetry_settings = self.get_settings("telemetry") if not telemetry_settings: return dagster_telemetry_enabled_default if "enabled" in telemetry_settings: return telemetry_settings["enabled"] else: return dagster_telemetry_enabled_default def upgrade(self, print_fn=lambda _: None): with upgrading_instance(self): print_fn("Updating run storage...") self._run_storage.upgrade() print_fn("Updating event storage...") self._event_storage.upgrade() print_fn("Updating schedule storage...") self._schedule_storage.upgrade() def optimize_for_dagit(self, statement_timeout): self._run_storage.optimize_for_dagit(statement_timeout=statement_timeout) self._event_storage.optimize_for_dagit(statement_timeout=statement_timeout) if self._schedule_storage: self._schedule_storage.optimize_for_dagit(statement_timeout=statement_timeout) def reindex(self, print_fn=lambda _: None): print_fn("Checking for reindexing...") self._event_storage.reindex(print_fn) print_fn("Done.") def dispose(self): self._run_storage.dispose() self.run_coordinator.dispose() self._run_launcher.dispose() self._event_storage.dispose() self._compute_log_manager.dispose() # run storage def get_run_by_id(self, run_id): return self._run_storage.get_run_by_id(run_id) def get_pipeline_snapshot(self, snapshot_id): return self._run_storage.get_pipeline_snapshot(snapshot_id) def has_pipeline_snapshot(self, snapshot_id): return self._run_storage.has_pipeline_snapshot(snapshot_id) def get_historical_pipeline(self, snapshot_id): from dagster.core.host_representation import HistoricalPipeline snapshot = self._run_storage.get_pipeline_snapshot(snapshot_id) parent_snapshot = ( self._run_storage.get_pipeline_snapshot(snapshot.lineage_snapshot.parent_snapshot_id) if snapshot.lineage_snapshot else None ) return HistoricalPipeline( self._run_storage.get_pipeline_snapshot(snapshot_id), snapshot_id, parent_snapshot ) def has_historical_pipeline(self, snapshot_id): return self._run_storage.has_pipeline_snapshot(snapshot_id) def get_execution_plan_snapshot(self, snapshot_id): return self._run_storage.get_execution_plan_snapshot(snapshot_id) def get_run_stats(self, run_id): return self._event_storage.get_stats_for_run(run_id) def get_run_step_stats(self, run_id, step_keys=None): return self._event_storage.get_step_stats_for_run(run_id, step_keys) def get_run_tags(self): return self._run_storage.get_run_tags() def get_run_group(self, run_id): return self._run_storage.get_run_group(run_id) def resolve_memoized_execution_plan(self, execution_plan, run_config, mode): """ Returns: ExecutionPlan: Execution plan configured to only run unmemoized steps. """ pipeline_def = execution_plan.pipeline.get_definition() pipeline_name = pipeline_def.name step_output_versions = resolve_step_output_versions( execution_plan, EnvironmentConfig.build(pipeline_def, run_config, mode), pipeline_def.get_mode_definition(mode), ) if all(version is None for version in step_output_versions.values()): raise DagsterInvariantViolationError( "While creating a memoized pipeline run, no steps have versions. At least one step " "must have a version." ) step_output_addresses = self.get_addresses_for_step_output_versions( { (pipeline_name, step_output_handle): version for step_output_handle, version in step_output_versions.items() if version } ) step_keys_to_execute = list( { step_output_handle.step_key for step_output_handle in step_output_versions.keys() if (pipeline_name, step_output_handle) not in step_output_addresses } ) return execution_plan.build_memoized_plan(step_keys_to_execute, step_output_addresses) def create_run_for_pipeline( self, pipeline_def, execution_plan=None, run_id=None, run_config=None, mode=None, solids_to_execute=None, step_keys_to_execute=None, status=None, tags=None, root_run_id=None, parent_run_id=None, solid_selection=None, ): from dagster.core.execution.api import create_execution_plan from dagster.core.execution.plan.plan import ExecutionPlan from dagster.core.snap import snapshot_from_execution_plan check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan) # note that solids_to_execute is required to execute the solid subset, which is the # frozenset version of the previous solid_subset. # solid_selection is not required and will not be converted to solids_to_execute here. # i.e. this function doesn't handle solid queries. # solid_selection is only used to pass the user queries further down. check.opt_set_param(solids_to_execute, "solids_to_execute", of_type=str) check.opt_list_param(solid_selection, "solid_selection", of_type=str) if solids_to_execute: if isinstance(pipeline_def, PipelineSubsetDefinition): # for the case when pipeline_def is created by IPipeline or ExternalPipeline check.invariant( solids_to_execute == pipeline_def.solids_to_execute, "Cannot create a PipelineRun from pipeline subset {pipeline_solids_to_execute} " "that conflicts with solids_to_execute arg {solids_to_execute}".format( pipeline_solids_to_execute=str_format_list(pipeline_def.solids_to_execute), solids_to_execute=str_format_list(solids_to_execute), ), ) else: # for cases when `create_run_for_pipeline` is directly called pipeline_def = pipeline_def.get_pipeline_subset_def( solids_to_execute=solids_to_execute ) full_execution_plan = execution_plan or create_execution_plan( pipeline_def, run_config=run_config, mode=mode, ) check.invariant( len(full_execution_plan.step_keys_to_execute) == len(full_execution_plan.steps) ) if is_memoized_run(tags): if step_keys_to_execute: raise DagsterInvariantViolationError( "step_keys_to_execute parameter cannot be used in conjunction with memoized " "pipeline runs." ) subsetted_execution_plan = self.resolve_memoized_execution_plan( full_execution_plan, run_config=run_config, mode=mode, ) # TODO: tighter integration with existing step_keys_to_execute functionality step_keys_to_execute = subsetted_execution_plan.step_keys_to_execute else: subsetted_execution_plan = ( full_execution_plan.build_subset_plan(step_keys_to_execute) if step_keys_to_execute else full_execution_plan ) return self.create_run( pipeline_name=pipeline_def.name, run_id=run_id, run_config=run_config, mode=check.opt_str_param(mode, "mode", default=pipeline_def.get_default_mode_name()), solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_def.get_pipeline_snapshot(), execution_plan_snapshot=snapshot_from_execution_plan( subsetted_execution_plan, pipeline_def.get_pipeline_snapshot_id() ), parent_pipeline_snapshot=pipeline_def.get_parent_pipeline_snapshot(), ) def _construct_run_with_snapshots( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, status, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, external_pipeline_origin=None, ): # https://github.com/dagster-io/dagster/issues/2403 if tags and IS_AIRFLOW_INGEST_PIPELINE_STR in tags: if AIRFLOW_EXECUTION_DATE_STR not in tags: tags[AIRFLOW_EXECUTION_DATE_STR] = get_current_datetime_in_utc().isoformat() check.invariant( not (not pipeline_snapshot and execution_plan_snapshot), "It is illegal to have an execution plan snapshot and not have a pipeline snapshot. " "It is possible to have no execution plan snapshot since we persist runs " "that do not successfully compile execution plans in the scheduled case.", ) pipeline_snapshot_id = ( self._ensure_persisted_pipeline_snapshot(pipeline_snapshot, parent_pipeline_snapshot) if pipeline_snapshot else None ) execution_plan_snapshot_id = ( self._ensure_persisted_execution_plan_snapshot( execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute ) if execution_plan_snapshot and pipeline_snapshot_id else None ) return PipelineRun( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot_id=pipeline_snapshot_id, execution_plan_snapshot_id=execution_plan_snapshot_id, external_pipeline_origin=external_pipeline_origin, ) def _ensure_persisted_pipeline_snapshot(self, pipeline_snapshot, parent_pipeline_snapshot): from dagster.core.snap import create_pipeline_snapshot_id, PipelineSnapshot check.inst_param(pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot) check.opt_inst_param(parent_pipeline_snapshot, "parent_pipeline_snapshot", PipelineSnapshot) if pipeline_snapshot.lineage_snapshot: if not self._run_storage.has_pipeline_snapshot( pipeline_snapshot.lineage_snapshot.parent_snapshot_id ): check.invariant( create_pipeline_snapshot_id(parent_pipeline_snapshot) == pipeline_snapshot.lineage_snapshot.parent_snapshot_id, "Parent pipeline snapshot id out of sync with passed parent pipeline snapshot", ) returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot( parent_pipeline_snapshot ) check.invariant( pipeline_snapshot.lineage_snapshot.parent_snapshot_id == returned_pipeline_snapshot_id ) pipeline_snapshot_id = create_pipeline_snapshot_id(pipeline_snapshot) if not self._run_storage.has_pipeline_snapshot(pipeline_snapshot_id): returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot( pipeline_snapshot ) check.invariant(pipeline_snapshot_id == returned_pipeline_snapshot_id) return pipeline_snapshot_id def _ensure_persisted_execution_plan_snapshot( self, execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute ): from dagster.core.snap.execution_plan_snapshot import ( ExecutionPlanSnapshot, create_execution_plan_snapshot_id, ) check.inst_param(execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot) check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) check.invariant( execution_plan_snapshot.pipeline_snapshot_id == pipeline_snapshot_id, ( "Snapshot mismatch: Snapshot ID in execution plan snapshot is " '"{ep_pipeline_snapshot_id}" and snapshot_id created in memory is ' '"{pipeline_snapshot_id}"' ).format( ep_pipeline_snapshot_id=execution_plan_snapshot.pipeline_snapshot_id, pipeline_snapshot_id=pipeline_snapshot_id, ), ) check.invariant( set(step_keys_to_execute) == set(execution_plan_snapshot.step_keys_to_execute) if step_keys_to_execute else set(execution_plan_snapshot.step_keys_to_execute) == set([step.key for step in execution_plan_snapshot.steps]), "We encode step_keys_to_execute twice in our stack, unfortunately. This check " "ensures that they are consistent. We check that step_keys_to_execute in the plan " "matches the step_keys_to_execute params if it is set. If it is not, this indicates " "a full execution plan, and so we verify that.", ) execution_plan_snapshot_id = create_execution_plan_snapshot_id(execution_plan_snapshot) if not self._run_storage.has_execution_plan_snapshot(execution_plan_snapshot_id): returned_execution_plan_snapshot_id = self._run_storage.add_execution_plan_snapshot( execution_plan_snapshot ) check.invariant(execution_plan_snapshot_id == returned_execution_plan_snapshot_id) return execution_plan_snapshot_id def create_run( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, status, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, external_pipeline_origin=None, ): pipeline_run = self._construct_run_with_snapshots( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_snapshot, execution_plan_snapshot=execution_plan_snapshot, parent_pipeline_snapshot=parent_pipeline_snapshot, external_pipeline_origin=external_pipeline_origin, ) return self._run_storage.add_run(pipeline_run) def register_managed_run( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, ): # The usage of this method is limited to dagster-airflow, specifically in Dagster # Operators that are executed in Airflow. Because a common workflow in Airflow is to # retry dags from arbitrary tasks, we need any node to be capable of creating a # PipelineRun. # # The try-except DagsterRunAlreadyExists block handles the race when multiple "root" tasks # simultaneously execute self._run_storage.add_run(pipeline_run). When this happens, only # one task succeeds in creating the run, while the others get DagsterRunAlreadyExists # error; at this point, the failed tasks try again to fetch the existing run. # https://github.com/dagster-io/dagster/issues/2412 pipeline_run = self._construct_run_with_snapshots( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=PipelineRunStatus.MANAGED, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_snapshot, execution_plan_snapshot=execution_plan_snapshot, parent_pipeline_snapshot=parent_pipeline_snapshot, ) def get_run(): candidate_run = self.get_run_by_id(pipeline_run.run_id) field_diff = _check_run_equality(pipeline_run, candidate_run) if field_diff: raise DagsterRunConflict( "Found conflicting existing run with same id {run_id}. Runs differ in:" "\n{field_diff}".format( run_id=pipeline_run.run_id, field_diff=_format_field_diff(field_diff), ), ) return candidate_run if self.has_run(pipeline_run.run_id): return get_run() try: return self._run_storage.add_run(pipeline_run) except DagsterRunAlreadyExists: return get_run() def add_run(self, pipeline_run): return self._run_storage.add_run(pipeline_run) def handle_run_event(self, run_id, event): return self._run_storage.handle_run_event(run_id, event) def add_run_tags(self, run_id, new_tags): return self._run_storage.add_run_tags(run_id, new_tags) def has_run(self, run_id): return self._run_storage.has_run(run_id) def get_runs(self, filters=None, cursor=None, limit=None): return self._run_storage.get_runs(filters, cursor, limit) def get_runs_count(self, filters=None): return self._run_storage.get_runs_count(filters) def get_run_groups(self, filters=None, cursor=None, limit=None): return self._run_storage.get_run_groups(filters=filters, cursor=cursor, limit=limit) def wipe(self): self._run_storage.wipe() self._event_storage.wipe() def delete_run(self, run_id): self._run_storage.delete_run(run_id) self._event_storage.delete_events(run_id) # event storage def logs_after(self, run_id, cursor): return self._event_storage.get_logs_for_run(run_id, cursor=cursor) def all_logs(self, run_id): return self._event_storage.get_logs_for_run(run_id) def watch_event_logs(self, run_id, cursor, cb): return self._event_storage.watch(run_id, cursor, cb) # asset storage @property def is_asset_aware(self): return self._event_storage.is_asset_aware def check_asset_aware(self): check.invariant( self.is_asset_aware, ( "Asset queries can only be performed on instances with asset-aware event log " "storage. Use `instance.is_asset_aware` to verify that the instance is configured " "with an EventLogStorage that implements `AssetAwareEventLogStorage`" ), ) def all_asset_keys(self, prefix_path=None): self.check_asset_aware() return self._event_storage.get_all_asset_keys(prefix_path) def has_asset_key(self, asset_key): self.check_asset_aware() return self._event_storage.has_asset_key(asset_key) def events_for_asset_key(self, asset_key, cursor=None, limit=None): check.inst_param(asset_key, "asset_key", AssetKey) self.check_asset_aware() return self._event_storage.get_asset_events(asset_key, cursor, limit) def run_ids_for_asset_key(self, asset_key): check.inst_param(asset_key, "asset_key", AssetKey) self.check_asset_aware() return self._event_storage.get_asset_run_ids(asset_key) def wipe_assets(self, asset_keys): check.list_param(asset_keys, "asset_keys", of_type=AssetKey) self.check_asset_aware() for asset_key in asset_keys: self._event_storage.wipe_asset(asset_key) # event subscriptions def get_logger(self): logger = logging.Logger("__event_listener") logger.addHandler(_EventListenerLogHandler(self)) logger.setLevel(10) return logger def handle_new_event(self, event): run_id = event.run_id self._event_storage.store_event(event) if event.is_dagster_event and event.dagster_event.is_pipeline_event: self._run_storage.handle_run_event(run_id, event.dagster_event) for sub in self._subscribers[run_id]: sub(event) def add_event_listener(self, run_id, cb): self._subscribers[run_id].append(cb) def report_engine_event( self, message, pipeline_run, engine_event_data=None, cls=None, step_key=None, ): """ Report a EngineEvent that occurred outside of a pipeline execution context. """ from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType from dagster.core.events.log import DagsterEventRecord check.class_param(cls, "cls") check.str_param(message, "message") check.inst_param(pipeline_run, "pipeline_run", PipelineRun) engine_event_data = check.opt_inst_param( engine_event_data, "engine_event_data", EngineEventData, EngineEventData([]), ) if cls: message = "[{}] {}".format(cls.__name__, message) log_level = logging.INFO if engine_event_data and engine_event_data.error: log_level = logging.ERROR dagster_event = DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, pipeline_name=pipeline_run.pipeline_name, message=message, event_specific_data=engine_event_data, ) event_record = DagsterEventRecord( message=message, user_message=message, level=log_level, pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id, error_info=None, timestamp=time.time(), step_key=step_key, dagster_event=dagster_event, ) self.handle_new_event(event_record) return dagster_event def report_run_failed(self, pipeline_run): from dagster.core.events import DagsterEvent, DagsterEventType from dagster.core.events.log import DagsterEventRecord check.inst_param(pipeline_run, "pipeline_run", PipelineRun) message = "This pipeline run has been marked as failed from outside the execution context." dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, pipeline_name=pipeline_run.pipeline_name, message=message, ) event_record = DagsterEventRecord( message=message, user_message=message, level=logging.ERROR, pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id, error_info=None, timestamp=time.time(), dagster_event=dagster_event, ) self.handle_new_event(event_record) return dagster_event # directories def file_manager_directory(self, run_id): return self._local_artifact_storage.file_manager_dir(run_id) def intermediates_directory(self, run_id): return self._local_artifact_storage.intermediates_dir(run_id) def schedules_directory(self): return self._local_artifact_storage.schedules_dir # Runs coordinator def submit_run(self, run_id, external_pipeline): """Submit a pipeline run to the coordinator. This method delegates to the ``RunCoordinator``, configured on the instance, and will call its implementation of ``RunCoordinator.submit_run()`` to send the run to the coordinator for execution. Runs should be created in the instance (e.g., by calling ``DagsterInstance.create_run()``) *before* this method is called, and should be in the ``PipelineRunStatus.NOT_STARTED`` state. They also must have a non-null ExternalPipelineOrigin. Args: run_id (str): The id of the run. """ from dagster.core.host_representation import ExternalPipelineOrigin run = self.get_run_by_id(run_id) check.inst( run.external_pipeline_origin, ExternalPipelineOrigin, "External pipeline origin must be set for submitted runs", ) try: submitted_run = self._run_coordinator.submit_run( run, external_pipeline=external_pipeline ) except: from dagster.core.events import EngineEventData error = serializable_error_info_from_exc_info(sys.exc_info()) self.report_engine_event( error.message, run, EngineEventData.engine_error(error), ) self.report_run_failed(run) raise return submitted_run # Run launcher def launch_run(self, run_id, external_pipeline): """Launch a pipeline run. This method is typically called using `instance.submit_run` rather than being invoked directly. This method delegates to the ``RunLauncher``, if any, configured on the instance, and will call its implementation of ``RunLauncher.launch_run()`` to begin the execution of the specified run. Runs should be created in the instance (e.g., by calling ``DagsterInstance.create_run()``) *before* this method is called, and should be in the ``PipelineRunStatus.NOT_STARTED`` state. Args: run_id (str): The id of the run the launch. """ run = self.get_run_by_id(run_id) try: launched_run = self._run_launcher.launch_run( self, run, external_pipeline=external_pipeline ) except: from dagster.core.events import EngineEventData error = serializable_error_info_from_exc_info(sys.exc_info()) self.report_engine_event( error.message, run, EngineEventData.engine_error(error), ) self.report_run_failed(run) raise return launched_run # Scheduler def reconcile_scheduler_state(self, external_repository): return self._scheduler.reconcile_scheduler_state(self, external_repository) def start_schedule_and_update_storage_state(self, external_schedule): return self._scheduler.start_schedule_and_update_storage_state(self, external_schedule) def stop_schedule_and_update_storage_state(self, schedule_origin_id): return self._scheduler.stop_schedule_and_update_storage_state(self, schedule_origin_id) def stop_schedule_and_delete_from_storage(self, schedule_origin_id): return self._scheduler.stop_schedule_and_delete_from_storage(self, schedule_origin_id) def running_schedule_count(self, schedule_origin_id): if self._scheduler: return self._scheduler.running_schedule_count(self, schedule_origin_id) return 0 def scheduler_debug_info(self): from dagster.core.scheduler import SchedulerDebugInfo from dagster.core.definitions.job import JobType from dagster.core.scheduler.job import JobStatus errors = [] schedules = [] for schedule_state in self.all_stored_job_state(job_type=JobType.SCHEDULE): if schedule_state.status == JobStatus.RUNNING and not self.running_schedule_count( schedule_state.job_origin_id ): errors.append( "Schedule {schedule_name} is set to be running, but the scheduler is not " "running the schedule.".format(schedule_name=schedule_state.job_name) ) elif schedule_state.status == JobStatus.STOPPED and self.running_schedule_count( schedule_state.job_origin_id ): errors.append( "Schedule {schedule_name} is set to be stopped, but the scheduler is still running " "the schedule.".format(schedule_name=schedule_state.job_name) ) if self.running_schedule_count(schedule_state.job_origin_id) > 1: errors.append( "Duplicate jobs found: More than one job for schedule {schedule_name} are " "running on the scheduler.".format(schedule_name=schedule_state.job_name) ) schedule_info = { schedule_state.job_name: { "status": schedule_state.status.value, "cron_schedule": schedule_state.job_specific_data.cron_schedule, "repository_pointer": schedule_state.origin.get_repo_cli_args(), "schedule_origin_id": schedule_state.job_origin_id, "repository_origin_id": schedule_state.repository_origin_id, } } schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False)) return SchedulerDebugInfo( scheduler_config_info=self.info_str_for_component("Scheduler", self.scheduler), scheduler_info=self.scheduler.debug_info(), schedule_storage=schedules, errors=errors, ) # Schedule Storage def all_stored_job_state(self, repository_origin_id=None, job_type=None): return self._schedule_storage.all_stored_job_state(repository_origin_id, job_type) def get_job_state(self, job_origin_id): return self._schedule_storage.get_job_state(job_origin_id) def add_job_state(self, job_state): return self._schedule_storage.add_job_state(job_state) def update_job_state(self, job_state): return self._schedule_storage.update_job_state(job_state) def delete_job_state(self, job_origin_id): return self._schedule_storage.delete_job_state(job_origin_id) def get_job_ticks(self, job_origin_id): return self._schedule_storage.get_job_ticks(job_origin_id) def get_latest_job_tick(self, job_origin_id): return self._schedule_storage.get_latest_job_tick(job_origin_id) - def has_job_tick(self, job_origin_id, execution_key, statuses=None): - return self._schedule_storage.has_job_tick(job_origin_id, execution_key, statuses) + def has_job_tick(self, job_origin_id, run_key, statuses=None): + return self._schedule_storage.has_job_tick(job_origin_id, run_key, statuses) def create_job_tick(self, job_tick_data): return self._schedule_storage.create_job_tick(job_tick_data) def update_job_tick(self, tick): return self._schedule_storage.update_job_tick(tick) def get_job_tick_stats(self, job_origin_id): return self._schedule_storage.get_job_tick_stats(job_origin_id) def wipe_all_schedules(self): if self._scheduler: self._scheduler.wipe(self) self._schedule_storage.wipe() def logs_path_for_schedule(self, schedule_origin_id): return self._scheduler.get_logs_path(self, schedule_origin_id) def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.dispose() def get_addresses_for_step_output_versions(self, step_output_versions): """ For each given step output, finds whether an output exists with the given version, and returns its address if it does. Args: step_output_versions (Dict[(str, StepOutputHandle), str]): (pipeline name, step output handle) -> version. Returns: Dict[(str, StepOutputHandle), str]: (pipeline name, step output handle) -> address. For each step output, an address if there is one and None otherwise. """ return self._event_storage.get_addresses_for_step_output_versions(step_output_versions) diff --git a/python_modules/dagster/dagster/core/scheduler/job.py b/python_modules/dagster/dagster/core/scheduler/job.py index b6e8349de..c57dffbf9 100644 --- a/python_modules/dagster/dagster/core/scheduler/job.py +++ b/python_modules/dagster/dagster/core/scheduler/job.py @@ -1,245 +1,244 @@ from collections import namedtuple from enum import Enum from dagster import check from dagster.core.definitions.job import JobType from dagster.core.host_representation.origin import ExternalJobOrigin from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo @whitelist_for_serdes class JobStatus(Enum): RUNNING = "RUNNING" STOPPED = "STOPPED" @whitelist_for_serdes class SensorJobData(namedtuple("_SensorJobData", "last_completed_timestamp")): def __new__(cls, last_completed_timestamp=None): return super(SensorJobData, cls).__new__( cls, check.opt_float_param(last_completed_timestamp, "last_completed_timestamp"), ) @whitelist_for_serdes class ScheduleJobData(namedtuple("_ScheduleJobData", "cron_schedule start_timestamp")): def __new__(cls, cron_schedule, start_timestamp=None): return super(ScheduleJobData, cls).__new__( cls, check.str_param(cron_schedule, "cron_schedule"), # Time in UTC at which the user started running the schedule (distinct from # `start_date` on partition-based schedules, which is used to define # the range of partitions) check.opt_float_param(start_timestamp, "start_timestamp"), ) def check_job_data(job_type, job_specific_data): check.inst_param(job_type, "job_type", JobType) if job_type == JobType.SCHEDULE: check.inst_param(job_specific_data, "job_specific_data", ScheduleJobData) elif job_type == JobType.SENSOR: check.opt_inst_param(job_specific_data, "job_specific_data", SensorJobData) else: check.failed( "Unexpected job type {}, expected one of JobType.SENSOR, JobType.SCHEDULE".format( job_type ) ) return job_specific_data @whitelist_for_serdes class JobState(namedtuple("_JobState", "origin job_type status job_specific_data")): def __new__(cls, origin, job_type, status, job_specific_data=None): return super(JobState, cls).__new__( cls, check.inst_param(origin, "origin", ExternalJobOrigin), check.inst_param(job_type, "job_type", JobType), check.inst_param(status, "status", JobStatus), check_job_data(job_type, job_specific_data), ) @property def name(self): return self.origin.job_name @property def job_name(self): return self.origin.job_name @property def repository_origin_id(self): return self.origin.external_repository_origin.get_id() @property def job_origin_id(self): return self.origin.get_id() def with_status(self, status): check.inst_param(status, "status", JobStatus) return JobState( self.origin, job_type=self.job_type, status=status, job_specific_data=self.job_specific_data, ) def with_data(self, job_specific_data): check_job_data(self.job_type, job_specific_data) return JobState( self.origin, job_type=self.job_type, status=self.status, job_specific_data=job_specific_data, ) @whitelist_for_serdes class JobTickStatus(Enum): STARTED = "STARTED" SKIPPED = "SKIPPED" SUCCESS = "SUCCESS" FAILURE = "FAILURE" class JobTick(namedtuple("_JobTick", "tick_id job_tick_data")): def __new__(cls, tick_id, job_tick_data): return super(JobTick, cls).__new__( cls, check.int_param(tick_id, "tick_id"), check.inst_param(job_tick_data, "job_tick_data", JobTickData), ) def with_status(self, status, **kwargs): check.inst_param(status, "status", JobTickStatus) return self._replace(job_tick_data=self.job_tick_data.with_status(status, **kwargs)) @property def job_origin_id(self): return self.job_tick_data.job_origin_id @property def job_name(self): return self.job_tick_data.job_name @property def job_type(self): return self.job_tick_data.job_type @property def timestamp(self): return self.job_tick_data.timestamp @property - def execution_key(self): - return self.job_tick_data.execution_key + def run_key(self): + return self.job_tick_data.run_key @property def status(self): return self.job_tick_data.status @property def run_id(self): return self.job_tick_data.run_id @property def error(self): return self.job_tick_data.error @whitelist_for_serdes class JobTickData( namedtuple( - "_JobTickData", - "job_origin_id job_name job_type status timestamp run_id error execution_key", + "_JobTickData", "job_origin_id job_name job_type status timestamp run_id error run_key", ) ): def __new__( cls, job_origin_id, job_name, job_type, status, timestamp, run_id=None, error=None, - execution_key=None, + run_key=None, ): """ This class defines the data that is serialized and stored in ``JobStorage``. We depend on the job storage implementation to provide job tick ids, and therefore separate all other data into this serializable class that can be stored independently of the id Arguments: job_origin_id (str): The id of the job target for this tick job_name (str): The name of the job for this tick job_type (JobType): The type of this job for this tick status (JobTickStatus): The status of the tick, which can be updated timestamp (float): The timestamp at which this job evaluation started Keyword Arguments: run_id (str): The run created by the tick. error (SerializableErrorInfo): The error caught during job execution. This is set only when the status is ``JobTickStatus.Failure`` - execution_key (Optional[str]): A string that uniquely identifies a pipeline execution + run_key (Optional[str]): A string that uniquely identifies a pipeline execution """ _validate_job_tick_args(job_type, status, run_id, error) return super(JobTickData, cls).__new__( cls, check.str_param(job_origin_id, "job_origin_id"), check.str_param(job_name, "job_name"), check.inst_param(job_type, "job_type", JobType), check.inst_param(status, "status", JobTickStatus), check.float_param(timestamp, "timestamp"), run_id, # validated in _validate_job_tick_args error, # validated in _validate_job_tick_args - check.opt_str_param(execution_key, "execution_key"), + check.opt_str_param(run_key, "run_key"), ) - def with_status(self, status, run_id=None, error=None, timestamp=None, execution_key=None): + def with_status(self, status, run_id=None, error=None, timestamp=None, run_key=None): check.inst_param(status, "status", JobTickStatus) return JobTickData( job_origin_id=self.job_origin_id, job_name=self.job_name, job_type=self.job_type, status=status, timestamp=timestamp if timestamp is not None else self.timestamp, run_id=run_id if run_id is not None else self.run_id, error=error if error is not None else self.error, - execution_key=execution_key if execution_key is not None else self.execution_key, + run_key=run_key if run_key is not None else self.run_key, ) def _validate_job_tick_args(job_type, status, run_id=None, error=None): check.inst_param(job_type, "job_type", JobType) check.inst_param(status, "status", JobTickStatus) if status == JobTickStatus.SUCCESS: check.str_param(run_id, "run_id") check.invariant(error is None, desc="Job tick status is SUCCESS, but error was provided") elif status == JobTickStatus.FAILURE: check.inst_param(error, "error", SerializableErrorInfo) else: check.invariant(error is None, "Job tick status was not FAILURE but error was provided") class JobTickStatsSnapshot( namedtuple( "JobTickStatsSnapshot", ("ticks_started ticks_succeeded ticks_skipped ticks_failed"), ) ): def __new__( cls, ticks_started, ticks_succeeded, ticks_skipped, ticks_failed, ): return super(JobTickStatsSnapshot, cls).__new__( cls, ticks_started=check.int_param(ticks_started, "ticks_started"), ticks_succeeded=check.int_param(ticks_succeeded, "ticks_succeeded"), ticks_skipped=check.int_param(ticks_skipped, "ticks_skipped"), ticks_failed=check.int_param(ticks_failed, "ticks_failed"), ) diff --git a/python_modules/dagster/dagster/core/storage/schedules/base.py b/python_modules/dagster/dagster/core/storage/schedules/base.py index c43e9430a..f0437fd8f 100644 --- a/python_modules/dagster/dagster/core/storage/schedules/base.py +++ b/python_modules/dagster/dagster/core/storage/schedules/base.py @@ -1,113 +1,113 @@ import abc import six class ScheduleStorage(six.with_metaclass(abc.ABCMeta)): """Abstract class for managing persistance of scheduler artifacts """ @abc.abstractmethod def wipe(self): """Delete all schedules from storage """ @abc.abstractmethod def all_stored_job_state(self, repository_origin_id=None, job_type=None): """Return all JobStates present in storage Args: repository_origin_id (Optional[str]): The ExternalRepository target id to scope results to job_type (Optional[JobType]): The JobType to scope results to """ @abc.abstractmethod def get_job_state(self, job_origin_id): """Return the unique job with the given id Args: job_origin_id (str): The unique job identifier """ @abc.abstractmethod def add_job_state(self, job): """Add a job to storage. Args: job (JobState): The job to add """ @abc.abstractmethod def update_job_state(self, job): """Update a job in storage. Args: job (JobState): The job to update """ @abc.abstractmethod def delete_job_state(self, job_origin_id): """Delete a job in storage. Args: job_origin_id (str): The id of the ExternalJob target to delete """ @abc.abstractmethod def get_job_ticks(self, job_origin_id): """Get the ticks for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod - def has_job_tick(self, job_origin_id, execution_key, statuses=None): - """Checks if there is a job tick for a given job / execution_key in storage. + def has_job_tick(self, job_origin_id, run_key, statuses=None): + """Checks if there is a job tick for a given job / run_key in storage. Args: job_origin_id (str): The id of the ExternalJob target - execution_key (str): User-provided key that identifies a given job execution + run_key (str): User-provided key that identifies a given job execution statuses (Optional[List[JobTickStatus]]): List of statuses to filter ticks by """ @abc.abstractmethod def get_latest_job_tick(self, job_origin_id): """Get the most recent tick for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod def create_job_tick(self, job_tick_data): """Add a job tick to storage. Args: repository_name (str): The repository the schedule belongs to job_tick_data (JobTickData): The job tick to add """ @abc.abstractmethod def update_job_tick(self, tick): """Update a job tick already in storage. Args: tick (ScheduleTick): The job tick to update """ @abc.abstractmethod def get_job_tick_stats(self, job_origin_id): """Get tick stats for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod def upgrade(self): """Perform any needed migrations """ def optimize_for_dagit(self, statement_timeout): """Allows for optimizing database connection / use in the context of a long lived dagit process""" diff --git a/python_modules/dagster/dagster/core/storage/schedules/schema.py b/python_modules/dagster/dagster/core/storage/schedules/schema.py index e4c8b4d23..58f641c13 100644 --- a/python_modules/dagster/dagster/core/storage/schedules/schema.py +++ b/python_modules/dagster/dagster/core/storage/schedules/schema.py @@ -1,62 +1,62 @@ import sqlalchemy as db ScheduleStorageSqlMetadata = db.MetaData() ScheduleTable = db.Table( "schedules", ScheduleStorageSqlMetadata, db.Column("id", db.Integer, primary_key=True, autoincrement=True), db.Column("schedule_origin_id", db.String(255), unique=True), db.Column("repository_origin_id", db.String(255)), db.Column("status", db.String(63)), db.Column("schedule_body", db.String), db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), db.Column("update_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), ) ScheduleTickTable = db.Table( "schedule_ticks", ScheduleStorageSqlMetadata, db.Column("id", db.Integer, primary_key=True, autoincrement=True), db.Column("schedule_origin_id", db.String(255), index=True), db.Column("status", db.String(63)), # utc timezone - make an index as a breaking change for 0.10.0 # (https://github.com/dagster-io/dagster/issues/2956) db.Column("timestamp", db.types.TIMESTAMP), db.Column("tick_body", db.String), # The create and update timestamps are not used in framework code, are are simply # present for debugging purposes. db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), db.Column("update_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), ) JobTable = db.Table( "jobs", ScheduleStorageSqlMetadata, db.Column("id", db.Integer, primary_key=True, autoincrement=True), db.Column("job_origin_id", db.String(255), unique=True), db.Column("repository_origin_id", db.String(255)), db.Column("status", db.String(63)), db.Column("job_type", db.String(63), index=True), db.Column("job_body", db.String), db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), db.Column("update_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), ) JobTickTable = db.Table( "job_ticks", ScheduleStorageSqlMetadata, db.Column("id", db.Integer, primary_key=True, autoincrement=True), db.Column("job_origin_id", db.String(255), index=True), db.Column("status", db.String(63)), db.Column("type", db.String(63)), - db.Column("execution_key", db.String), + db.Column("run_key", db.String), db.Column("timestamp", db.types.TIMESTAMP), db.Column("tick_body", db.String), db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), db.Column("update_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")), ) -db.Index("idx_job_tick_execution_key", JobTickTable.c.job_origin_id, JobTickTable.c.execution_key) +db.Index("idx_job_tick_run_key", JobTickTable.c.job_origin_id, JobTickTable.c.run_key) db.Index("idx_job_tick_status", JobTickTable.c.job_origin_id, JobTickTable.c.status) db.Index("idx_job_tick_timestamp", JobTickTable.c.job_origin_id, JobTickTable.c.timestamp) diff --git a/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py b/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py index 206359fb0..ff33d2240 100644 --- a/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py +++ b/python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py @@ -1,247 +1,247 @@ from abc import abstractmethod import six import sqlalchemy as db from dagster import check from dagster.core.definitions.job import JobType from dagster.core.errors import DagsterInvariantViolationError from dagster.core.scheduler.job import ( JobState, JobTick, JobTickData, JobTickStatsSnapshot, JobTickStatus, ) from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple from dagster.utils import utc_datetime_from_timestamp from .base import ScheduleStorage from .schema import JobTable, JobTickTable, ScheduleTable, ScheduleTickTable class SqlScheduleStorage(ScheduleStorage): """Base class for SQL backed schedule storage """ @abstractmethod def connect(self): """Context manager yielding a sqlalchemy.engine.Connection.""" def execute(self, query): with self.connect() as conn: result_proxy = conn.execute(query) res = result_proxy.fetchall() result_proxy.close() return res def _deserialize_rows(self, rows): return list(map(lambda r: deserialize_json_to_dagster_namedtuple(r[0]), rows)) def all_stored_job_state(self, repository_origin_id=None, job_type=None): check.opt_inst_param(job_type, "job_type", JobType) base_query = db.select([JobTable.c.job_body, JobTable.c.job_origin_id]).select_from( JobTable ) if repository_origin_id: query = base_query.where(JobTable.c.repository_origin_id == repository_origin_id) else: query = base_query if job_type: query = query.where(JobTable.c.job_type == job_type.value) rows = self.execute(query) return self._deserialize_rows(rows) def get_job_state(self, job_origin_id): check.str_param(job_origin_id, "job_origin_id") query = ( db.select([JobTable.c.job_body]) .select_from(JobTable) .where(JobTable.c.job_origin_id == job_origin_id) ) rows = self.execute(query) return self._deserialize_rows(rows[:1])[0] if len(rows) else None def add_job_state(self, job): check.inst_param(job, "job", JobState) with self.connect() as conn: try: conn.execute( JobTable.insert().values( # pylint: disable=no-value-for-parameter job_origin_id=job.job_origin_id, repository_origin_id=job.repository_origin_id, status=job.status.value, job_type=job.job_type.value, job_body=serialize_dagster_namedtuple(job), ) ) except db.exc.IntegrityError as exc: six.raise_from( DagsterInvariantViolationError( "JobState {id} is already present in storage".format(id=job.job_origin_id,) ), exc, ) return job def update_job_state(self, job): check.inst_param(job, "job", JobState) if not self.get_job_state(job.job_origin_id): raise DagsterInvariantViolationError( "JobState {id} is not present in storage".format(id=job.job_origin_id) ) with self.connect() as conn: conn.execute( JobTable.update() # pylint: disable=no-value-for-parameter .where(JobTable.c.job_origin_id == job.job_origin_id) .values(status=job.status.value, job_body=serialize_dagster_namedtuple(job),) ) def delete_job_state(self, job_origin_id): check.str_param(job_origin_id, "job_origin_id") if not self.get_job_state(job_origin_id): raise DagsterInvariantViolationError( "JobState {id} is not present in storage".format(id=job_origin_id) ) with self.connect() as conn: conn.execute( JobTable.delete().where( # pylint: disable=no-value-for-parameter JobTable.c.job_origin_id == job_origin_id ) ) def get_latest_job_tick(self, job_origin_id): check.str_param(job_origin_id, "job_origin_id") query = ( db.select([JobTickTable.c.id, JobTickTable.c.tick_body]) .select_from(JobTickTable) .where(JobTickTable.c.job_origin_id == job_origin_id) .order_by(JobTickTable.c.timestamp.desc()) .limit(1) ) rows = self.execute(query) if len(rows) == 0: return None return JobTick(rows[0][0], deserialize_json_to_dagster_namedtuple(rows[0][1])) def get_job_ticks(self, job_origin_id): check.str_param(job_origin_id, "job_origin_id") query = ( db.select([JobTickTable.c.id, JobTickTable.c.tick_body]) .select_from(JobTickTable) .where(JobTickTable.c.job_origin_id == job_origin_id) .order_by(JobTickTable.c.id.desc()) ) rows = self.execute(query) return list( map(lambda r: JobTick(r[0], deserialize_json_to_dagster_namedtuple(r[1])), rows) ) - def has_job_tick(self, job_origin_id, execution_key, statuses=None): + def has_job_tick(self, job_origin_id, run_key, statuses=None): check.str_param(job_origin_id, "job_origin_id") - check.str_param(execution_key, "execution_key") + check.str_param(run_key, "run_key") check.opt_list_param(statuses, "statuses", of_type=JobTickStatus) query = ( db.select([1]) .select_from(JobTickTable) .where(JobTickTable.c.job_origin_id == job_origin_id) - .where(JobTickTable.c.execution_key == execution_key) + .where(JobTickTable.c.run_key == run_key) ) if statuses: query = query.where(JobTickTable.c.status.in_([status.value for status in statuses])) query = query.limit(1) rows = self.execute(query) return len(rows) > 0 def create_job_tick(self, job_tick_data): check.inst_param(job_tick_data, "job_tick_data", JobTickData) with self.connect() as conn: try: tick_insert = JobTickTable.insert().values( # pylint: disable=no-value-for-parameter job_origin_id=job_tick_data.job_origin_id, status=job_tick_data.status.value, type=job_tick_data.job_type.value, - execution_key=job_tick_data.execution_key, + run_key=job_tick_data.run_key, timestamp=utc_datetime_from_timestamp(job_tick_data.timestamp), tick_body=serialize_dagster_namedtuple(job_tick_data), ) result = conn.execute(tick_insert) tick_id = result.inserted_primary_key[0] return JobTick(tick_id, job_tick_data) except db.exc.IntegrityError as exc: six.raise_from( DagsterInvariantViolationError( "Unable to insert JobTick for job {job_name} in storage".format( job_name=job_tick_data.job_name, ) ), exc, ) def update_job_tick(self, tick): check.inst_param(tick, "tick", JobTick) with self.connect() as conn: conn.execute( JobTickTable.update() # pylint: disable=no-value-for-parameter .where(JobTickTable.c.id == tick.tick_id) .values( status=tick.status.value, type=tick.job_type.value, - execution_key=tick.execution_key, + run_key=tick.run_key, timestamp=utc_datetime_from_timestamp(tick.timestamp), tick_body=serialize_dagster_namedtuple(tick.job_tick_data), ) ) return tick def get_job_tick_stats(self, job_origin_id): check.str_param(job_origin_id, "job_origin_id") query = ( db.select([JobTickTable.c.status, db.func.count()]) .select_from(JobTickTable) .where(JobTickTable.c.job_origin_id == job_origin_id) .group_by(JobTickTable.c.status) ) rows = self.execute(query) counts = {} for status, count in rows: counts[status] = count return JobTickStatsSnapshot( ticks_started=counts.get(JobTickStatus.STARTED.value, 0), ticks_succeeded=counts.get(JobTickStatus.SUCCESS.value, 0), ticks_skipped=counts.get(JobTickStatus.SKIPPED.value, 0), ticks_failed=counts.get(JobTickStatus.FAILURE.value, 0), ) def wipe(self): """Clears the schedule storage.""" with self.connect() as conn: # https://stackoverflow.com/a/54386260/324449 conn.execute(ScheduleTable.delete()) # pylint: disable=no-value-for-parameter conn.execute(ScheduleTickTable.delete()) # pylint: disable=no-value-for-parameter conn.execute(JobTable.delete()) # pylint: disable=no-value-for-parameter conn.execute(JobTickTable.delete()) # pylint: disable=no-value-for-parameter diff --git a/python_modules/dagster/dagster/core/storage/tags.py b/python_modules/dagster/dagster/core/storage/tags.py index eaa3106b1..62da663d5 100644 --- a/python_modules/dagster/dagster/core/storage/tags.py +++ b/python_modules/dagster/dagster/core/storage/tags.py @@ -1,67 +1,67 @@ from enum import Enum from dagster import check SYSTEM_TAG_PREFIX = "dagster/" HIDDEN_TAG_PREFIX = ".dagster/" SCHEDULE_NAME_TAG = "{prefix}schedule_name".format(prefix=SYSTEM_TAG_PREFIX) SENSOR_NAME_TAG = "{prefix}sensor_name".format(prefix=SYSTEM_TAG_PREFIX) BACKFILL_ID_TAG = "{prefix}backfill".format(prefix=SYSTEM_TAG_PREFIX) PARTITION_NAME_TAG = "{prefix}partition".format(prefix=SYSTEM_TAG_PREFIX) PARTITION_SET_TAG = "{prefix}partition_set".format(prefix=SYSTEM_TAG_PREFIX) PARENT_RUN_ID_TAG = "{prefix}parent_run_id".format(prefix=SYSTEM_TAG_PREFIX) ROOT_RUN_ID_TAG = "{prefix}root_run_id".format(prefix=SYSTEM_TAG_PREFIX) RESUME_RETRY_TAG = "{prefix}is_resume_retry".format(prefix=SYSTEM_TAG_PREFIX) MEMOIZED_RUN_TAG = "{prefix}is_memoized_run".format(prefix=SYSTEM_TAG_PREFIX) STEP_SELECTION_TAG = "{prefix}step_selection".format(prefix=SYSTEM_TAG_PREFIX) SOLID_SELECTION_TAG = "{prefix}solid_selection".format(prefix=SYSTEM_TAG_PREFIX) PRESET_NAME_TAG = "{prefix}preset_name".format(prefix=SYSTEM_TAG_PREFIX) GRPC_INFO_TAG = "{prefix}grpc_info".format(prefix=HIDDEN_TAG_PREFIX) SCHEDULED_EXECUTION_TIME_TAG = "{prefix}scheduled_execution_time".format(prefix=HIDDEN_TAG_PREFIX) -EXECUTION_KEY_TAG = "{prefix}execution_key".format(prefix=HIDDEN_TAG_PREFIX) +RUN_KEY_TAG = "{prefix}run_key".format(prefix=HIDDEN_TAG_PREFIX) class TagType(Enum): # Custom tag provided by a user USER_PROVIDED = "USER_PROVIDED" # Tags used by Dagster to manage execution that should be surfaced to users. SYSTEM = "SYSTEM" # Metadata used by Dagster for execution but isn't useful for users to see. # For example, metadata about the gRPC server that executed a run. HIDDEN = "HIDDEN" def get_tag_type(tag): if tag.startswith(SYSTEM_TAG_PREFIX): return TagType.SYSTEM elif tag.startswith(HIDDEN_TAG_PREFIX): return TagType.HIDDEN else: return TagType.USER_PROVIDED def check_tags(obj, name): check.opt_dict_param(obj, name, key_type=str, value_type=str) for tag in obj.keys(): check.invariant( not tag.startswith(SYSTEM_TAG_PREFIX), desc="User attempted to set tag with reserved system prefix: {tag}".format(tag=tag), ) diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py index 42734dc03..38a5f9bed 100644 --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -1,413 +1,411 @@ """Workhorse functions for individual API requests.""" import os import sys import pendulum from dagster import check from dagster.core.definitions import ScheduleExecutionContext from dagster.core.definitions.reconstructable import ( ReconstructablePipeline, ReconstructableRepository, ) -from dagster.core.definitions.sensor import SensorExecutionContext, SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SensorExecutionContext, SkipReason from dagster.core.errors import ( DagsterInvalidSubsetError, DagsterRunNotFoundError, DagsterSubprocessError, PartitionExecutionError, ScheduleExecutionError, SensorExecutionError, user_code_error_boundary, ) from dagster.core.events import EngineEventData from dagster.core.execution.api import create_execution_plan, execute_run_iterator from dagster.core.host_representation import external_pipeline_data_from_def from dagster.core.host_representation.external_data import ( ExternalPartitionConfigData, ExternalPartitionExecutionErrorData, ExternalPartitionExecutionParamData, ExternalPartitionNamesData, ExternalPartitionSetExecutionParamData, ExternalPartitionTagsData, ExternalPipelineSubsetResult, ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData, ExternalSensorExecutionData, ExternalSensorExecutionErrorData, ) from dagster.core.instance import DagsterInstance from dagster.core.snap.execution_plan_snapshot import ( ExecutionPlanSnapshotErrorData, snapshot_from_execution_plan, ) from dagster.core.storage.pipeline_run import PipelineRun from dagster.grpc.types import ExecutionPlanSnapshotArgs from dagster.serdes import deserialize_json_to_dagster_namedtuple from dagster.serdes.ipc import IPCErrorMessage from dagster.utils import delay_interrupts, start_termination_thread from dagster.utils.error import serializable_error_info_from_exc_info from .types import ExecuteExternalPipelineArgs class RunInSubprocessComplete: """Sentinel passed over multiprocessing Queue when subprocess is complete""" class StartRunInSubprocessSuccessful: """Sentinel passed over multiprocessing Queue when launch is successful in subprocess.""" def _core_execute_run(recon_pipeline, pipeline_run, instance): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) check.inst_param(pipeline_run, "pipeline_run", PipelineRun) check.inst_param(instance, "instance", DagsterInstance) try: yield from execute_run_iterator(recon_pipeline, pipeline_run, instance) except DagsterSubprocessError as err: if not all( [err_info.cls_name == "KeyboardInterrupt" for err_info in err.subprocess_error_infos] ): yield instance.report_engine_event( "An exception was thrown during execution that is likely a framework error, " "rather than an error in user code.", pipeline_run, EngineEventData.engine_error(serializable_error_info_from_exc_info(sys.exc_info())), ) instance.report_run_failed(pipeline_run) except Exception: # pylint: disable=broad-except yield instance.report_engine_event( "An exception was thrown during execution that is likely a framework error, " "rather than an error in user code.", pipeline_run, EngineEventData.engine_error(serializable_error_info_from_exc_info(sys.exc_info())), ) instance.report_run_failed(pipeline_run) def _run_in_subprocess( serialized_execute_run_args, recon_pipeline, termination_event, subprocess_status_handler, run_event_handler, ): start_termination_thread(termination_event) try: execute_run_args = deserialize_json_to_dagster_namedtuple(serialized_execute_run_args) check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) instance = DagsterInstance.from_ref(execute_run_args.instance_ref) pipeline_run = instance.get_run_by_id(execute_run_args.pipeline_run_id) if not pipeline_run: raise DagsterRunNotFoundError( "gRPC server could not load run {run_id} in order to execute it. Make sure that the gRPC server has access to your run storage.".format( run_id=execute_run_args.pipeline_run_id ), invalid_run_id=execute_run_args.pipeline_run_id, ) pid = os.getpid() except: # pylint: disable=bare-except serializable_error_info = serializable_error_info_from_exc_info(sys.exc_info()) event = IPCErrorMessage( serializable_error_info=serializable_error_info, message="Error during RPC setup for executing run: {message}".format( message=serializable_error_info.message ), ) subprocess_status_handler(event) subprocess_status_handler(RunInSubprocessComplete()) if instance: instance.dispose() return subprocess_status_handler(StartRunInSubprocessSuccessful()) run_event_handler( instance.report_engine_event( "Started process for pipeline (pid: {pid}).".format(pid=pid), pipeline_run, EngineEventData.in_process(pid, marker_end="cli_api_subprocess_init"), ) ) # This is so nasty but seemingly unavoidable # https://amir.rachum.com/blog/2017/03/03/generator-cleanup/ closed = False try: for event in _core_execute_run(recon_pipeline, pipeline_run, instance): run_event_handler(event) except KeyboardInterrupt: run_event_handler( instance.report_engine_event( message="Pipeline execution terminated by interrupt", pipeline_run=pipeline_run, ) ) raise except GeneratorExit: closed = True raise finally: if not closed: run_event_handler( instance.report_engine_event( "Process for pipeline exited (pid: {pid}).".format(pid=pid), pipeline_run, ) ) subprocess_status_handler(RunInSubprocessComplete()) instance.dispose() def start_run_in_subprocess( serialized_execute_run_args, recon_pipeline, event_queue, termination_event ): with delay_interrupts(): _run_in_subprocess( serialized_execute_run_args, recon_pipeline, termination_event, subprocess_status_handler=event_queue.put, run_event_handler=lambda x: None, ) def get_external_pipeline_subset_result(recon_pipeline, solid_selection): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) if solid_selection: try: sub_pipeline = recon_pipeline.subset_for_execution(solid_selection) definition = sub_pipeline.get_definition() except DagsterInvalidSubsetError: return ExternalPipelineSubsetResult( success=False, error=serializable_error_info_from_exc_info(sys.exc_info()) ) else: definition = recon_pipeline.get_definition() external_pipeline_data = external_pipeline_data_from_def(definition) return ExternalPipelineSubsetResult(success=True, external_pipeline_data=external_pipeline_data) def get_external_schedule_execution( recon_repo, instance_ref, schedule_name, scheduled_execution_timestamp, scheduled_execution_timezone, ): check.inst_param( recon_repo, "recon_repo", ReconstructableRepository, ) definition = recon_repo.get_definition() schedule_def = definition.get_schedule_def(schedule_name) with DagsterInstance.from_ref(instance_ref) as instance: scheduled_execution_time = ( pendulum.from_timestamp(scheduled_execution_timestamp, tz=scheduled_execution_timezone,) if scheduled_execution_timestamp else None ) schedule_context = ScheduleExecutionContext(instance, scheduled_execution_time) try: with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of should_execute for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): if not schedule_def.should_execute(schedule_context): return ExternalScheduleExecutionData( should_execute=False, run_config=None, tags=None ) with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of run_config_fn for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): run_config = schedule_def.get_run_config(schedule_context) with user_code_error_boundary( ScheduleExecutionError, lambda: "Error occurred during the execution of tags_fn for schedule " "{schedule_name}".format(schedule_name=schedule_def.name), ): tags = schedule_def.get_tags(schedule_context) return ExternalScheduleExecutionData( run_config=run_config, tags=tags, should_execute=True ) except ScheduleExecutionError: return ExternalScheduleExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_external_sensor_execution(recon_repo, instance_ref, sensor_name, last_completion_timestamp): check.inst_param( recon_repo, "recon_repo", ReconstructableRepository, ) definition = recon_repo.get_definition() sensor_def = definition.get_sensor_def(sensor_name) with DagsterInstance.from_ref(instance_ref) as instance: sensor_context = SensorExecutionContext( instance, last_completion_time=last_completion_timestamp ) try: with user_code_error_boundary( SensorExecutionError, lambda: "Error occurred during the execution of evaluation_fn for sensor " "{sensor_name}".format(sensor_name=sensor_def.name), ): tick_data_list = sensor_def.get_tick_data(sensor_context) return ExternalSensorExecutionData( - run_params=[ - tick for tick in tick_data_list if isinstance(tick, SensorRunParams) - ], + run_requests=[tick for tick in tick_data_list if isinstance(tick, RunRequest)], skip_message=tick_data_list[0].skip_message - if tick_data_list and isinstance(tick_data_list[0], SensorSkipData) + if tick_data_list and isinstance(tick_data_list[0], SkipReason) else None, ) except SensorExecutionError: return ExternalSensorExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_config(recon_repo, partition_set_name, partition_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) partition = partition_set_def.get_partition(partition_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the evaluation of the `run_config_for_partition` " "function for partition set {partition_set_name}".format( partition_set_name=partition_set_def.name ), ): run_config = partition_set_def.run_config_for_partition(partition) return ExternalPartitionConfigData(name=partition.name, run_config=run_config) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_names(recon_repo, partition_set_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the execution of the partition generation function for " "partition set {partition_set_name}".format(partition_set_name=partition_set_def.name), ): return ExternalPartitionNamesData( partition_names=partition_set_def.get_partition_names() ) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_tags(recon_repo, partition_set_name, partition_name): definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_set_name) partition = partition_set_def.get_partition(partition_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the evaluation of the `tags_for_partition` function for " "partition set {partition_set_name}".format(partition_set_name=partition_set_def.name), ): tags = partition_set_def.tags_for_partition(partition) return ExternalPartitionTagsData(name=partition.name, tags=tags) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) def get_external_execution_plan_snapshot(recon_pipeline, args): check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline) check.inst_param(args, "args", ExecutionPlanSnapshotArgs) try: pipeline = ( recon_pipeline.subset_for_execution(args.solid_selection) if args.solid_selection else recon_pipeline ) return snapshot_from_execution_plan( create_execution_plan( pipeline=pipeline, run_config=args.run_config, mode=args.mode, step_keys_to_execute=args.step_keys_to_execute, ), args.pipeline_snapshot_id, ) except: # pylint: disable=bare-except return ExecutionPlanSnapshotErrorData( error=serializable_error_info_from_exc_info(sys.exc_info()) ) def get_partition_set_execution_param_data(recon_repo, partition_set_name, partition_names): repo_definition = recon_repo.get_definition() partition_set_def = repo_definition.get_partition_set_def(partition_set_name) try: with user_code_error_boundary( PartitionExecutionError, lambda: "Error occurred during the partition generation for partition set " "{partition_set_name}".format(partition_set_name=partition_set_def.name), ): all_partitions = partition_set_def.get_partitions() partitions = [ partition for partition in all_partitions if partition.name in partition_names ] partition_data = [] for partition in partitions: def _error_message_fn(partition_set_name, partition_name): return lambda: ( "Error occurred during the partition config and tag generation for " "partition set {partition_set_name}::{partition_name}".format( partition_set_name=partition_set_name, partition_name=partition_name ) ) with user_code_error_boundary( PartitionExecutionError, _error_message_fn(partition_set_def.name, partition.name) ): run_config = partition_set_def.run_config_for_partition(partition) tags = partition_set_def.tags_for_partition(partition) partition_data.append( ExternalPartitionExecutionParamData( name=partition.name, tags=tags, run_config=run_config, ) ) return ExternalPartitionSetExecutionParamData(partition_data=partition_data) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) ) diff --git a/python_modules/dagster/dagster/scheduler/sensor.py b/python_modules/dagster/dagster/scheduler/sensor.py index d043459af..8a133eb29 100644 --- a/python_modules/dagster/dagster/scheduler/sensor.py +++ b/python_modules/dagster/dagster/scheduler/sensor.py @@ -1,371 +1,368 @@ import os import sys import time import pendulum from dagster import check from dagster.core.definitions.job import JobType from dagster.core.errors import DagsterSubprocessError from dagster.core.events import EngineEventData from dagster.core.host_representation import ( ExternalPipeline, PipelineSelector, RepositoryLocation, RepositoryLocationHandle, ) from dagster.core.host_representation.external_data import ( ExternalSensorExecutionData, ExternalSensorExecutionErrorData, ) from dagster.core.instance import DagsterInstance from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter -from dagster.core.storage.tags import EXECUTION_KEY_TAG, check_tags +from dagster.core.storage.tags import RUN_KEY_TAG, check_tags from dagster.utils import merge_dicts from dagster.utils.error import serializable_error_info_from_exc_info RECORDED_TICK_STATES = [JobTickStatus.SUCCESS, JobTickStatus.FAILURE] FULFILLED_TICK_STATES = [JobTickStatus.SKIPPED, JobTickStatus.SUCCESS] class SensorLaunchContext: def __init__(self, job_state, tick, instance, logger): self._job_state = job_state self._tick = tick self._instance = instance self._logger = logger self._to_resolve = [] @property def status(self): return self._tick.status @property def logger(self): return self._logger def add_state(self, status, **kwargs): self._to_resolve.append(self._tick.with_status(status=status, **kwargs)) def _write(self): to_update = self._to_resolve[0] if self._to_resolve else self._tick self._instance.update_job_tick(to_update) for tick in self._to_resolve[1:]: self._instance.create_job_tick(tick.job_tick_data) if any([tick.status in FULFILLED_TICK_STATES for tick in self._to_resolve]): self._instance.update_job_state( self._job_state.with_data(SensorJobData(self._tick.timestamp)) ) def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): if exception_value and not isinstance(exception_value, KeyboardInterrupt): error_data = serializable_error_info_from_exc_info(sys.exc_info()) self.add_state(JobTickStatus.FAILURE, error=error_data) self._write() self._logger.error( "Error launching sensor run: {error_info}".format( error_info=error_data.to_string() ), ) return True # Swallow the exception after logging in the tick DB self._write() def _check_for_debug_crash(debug_crash_flags, key): if not debug_crash_flags: return kill_signal = debug_crash_flags.get(key) if not kill_signal: return os.kill(os.getpid(), kill_signal) time.sleep(10) raise Exception("Process didn't terminate after sending crash signal") def execute_sensor_iteration(instance, logger, debug_crash_flags=None): check.inst_param(instance, "instance", DagsterInstance) sensor_jobs = [ s for s in instance.all_stored_job_state(job_type=JobType.SENSOR) if s.status == JobStatus.RUNNING ] if not sensor_jobs: logger.info("Not checking for any runs since no sensors have been started.") return logger.info( "Checking for new runs for the following sensors: {sensor_names}".format( sensor_names=", ".join([job.job_name for job in sensor_jobs]), ) ) for job_state in sensor_jobs: sensor_debug_crash_flags = ( debug_crash_flags.get(job_state.job_name) if debug_crash_flags else None ) try: with RepositoryLocationHandle.create_from_repository_location_origin( job_state.origin.external_repository_origin.repository_location_origin ) as repo_location_handle: repo_location = RepositoryLocation.from_handle(repo_location_handle) repo_dict = repo_location.get_repositories() check.invariant( len(repo_dict) == 1, "Reconstructed repository location should have exactly one repository", ) external_repo = next(iter(repo_dict.values())) if not external_repo.has_external_job(job_state.job_name): continue now = pendulum.now() latest_tick = instance.get_latest_job_tick(job_state.job_origin_id) if not latest_tick or latest_tick.status in RECORDED_TICK_STATES: tick = instance.create_job_tick( JobTickData( job_origin_id=job_state.job_origin_id, job_name=job_state.job_name, job_type=JobType.SENSOR, status=JobTickStatus.STARTED, timestamp=now.timestamp(), ) ) else: tick = latest_tick.with_status( - JobTickStatus.STARTED, timestamp=now.timestamp(), execution_key=None, + JobTickStatus.STARTED, timestamp=now.timestamp(), run_key=None, ) instance.update_job_tick(tick) _check_for_debug_crash(sensor_debug_crash_flags, "TICK_CREATED") external_sensor = external_repo.get_external_sensor(job_state.job_name) with SensorLaunchContext(job_state, tick, instance, logger) as tick_context: _check_for_debug_crash(sensor_debug_crash_flags, "TICK_HELD") _evaluate_sensor( tick_context, instance, repo_location, external_repo, external_sensor, job_state, sensor_debug_crash_flags, ) except Exception: # pylint: disable=broad-except logger.error( "Sensor failed for {sensor_name} : {error_info}".format( sensor_name=job_state.job_name, error_info=serializable_error_info_from_exc_info(sys.exc_info()).to_string(), ) ) def _evaluate_sensor( context, instance, repo_location, external_repo, external_sensor, job_state, sensor_debug_crash_flags=None, ): sensor_runtime_data = repo_location.get_external_sensor_execution_data( instance, external_repo.handle, external_sensor.name, job_state.job_specific_data.last_completed_timestamp if job_state.job_specific_data else None, ) if isinstance(sensor_runtime_data, ExternalSensorExecutionErrorData): context.logger.error( "Failed to resolve sensor for {sensor_name} : {error_info}".format( sensor_name=external_sensor.name, error_info=sensor_runtime_data.error.to_string(), ) ) context.add_state(JobTickStatus.FAILURE, error=sensor_runtime_data.error) return assert isinstance(sensor_runtime_data, ExternalSensorExecutionData) - if not sensor_runtime_data.run_params: + if not sensor_runtime_data.run_requests: if sensor_runtime_data.skip_message: context.logger.info( f"Sensor returned false for {external_sensor.name}, skipping: " f"{sensor_runtime_data.skip_message}" ) else: context.logger.info(f"Sensor returned false for {external_sensor.name}, skipping") context.add_state(JobTickStatus.SKIPPED) return pipeline_selector = PipelineSelector( location_name=repo_location.name, repository_name=external_repo.name, pipeline_name=external_sensor.pipeline_name, solid_selection=external_sensor.solid_selection, ) subset_pipeline_result = repo_location.get_subset_external_pipeline_result(pipeline_selector) external_pipeline = ExternalPipeline( subset_pipeline_result.external_pipeline_data, external_repo.handle, ) - for run_params in sensor_runtime_data.run_params: - if run_params.execution_key and instance.has_job_tick( - external_sensor.get_external_origin_id(), - run_params.execution_key, - [JobTickStatus.SUCCESS], + for run_request in sensor_runtime_data.run_requests: + if run_request.run_key and instance.has_job_tick( + external_sensor.get_external_origin_id(), run_request.run_key, [JobTickStatus.SUCCESS], ): context.logger.info( - "Found existing run for sensor {sensor_name} with execution_key `{execution_key}`, skipping.".format( - sensor_name=external_sensor.name, execution_key=run_params.execution_key + "Found existing run for sensor {sensor_name} with run_key `{run_key}`, skipping.".format( + sensor_name=external_sensor.name, run_key=run_request.run_key ) ) - context.add_state(JobTickStatus.SKIPPED, execution_key=run_params.execution_key) + context.add_state(JobTickStatus.SKIPPED, run_key=run_request.run_key) continue run = _get_or_create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) if not run: # we already found and resolved a run continue _check_for_debug_crash(sensor_debug_crash_flags, "RUN_CREATED") try: context.logger.info( "Launching run for {sensor_name}".format(sensor_name=external_sensor.name) ) instance.submit_run(run.run_id, external_pipeline) context.logger.info( "Completed launch of run {run_id} for {sensor_name}".format( run_id=run.run_id, sensor_name=external_sensor.name ) ) except Exception: # pylint: disable=broad-except context.logger.error( "Run {run_id} created successfully but failed to launch.".format(run_id=run.run_id) ) _check_for_debug_crash(sensor_debug_crash_flags, "RUN_LAUNCHED") context.add_state( - JobTickStatus.SUCCESS, run_id=run.run_id, execution_key=run_params.execution_key, + JobTickStatus.SUCCESS, run_id=run.run_id, run_key=run_request.run_key, ) def _get_or_create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ): - if not run_params.execution_key: + if not run_request.run_key: return _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) existing_runs = instance.get_runs( PipelineRunsFilter( tags=merge_dicts( - PipelineRun.tags_for_sensor(external_sensor), - {EXECUTION_KEY_TAG: run_params.execution_key}, + PipelineRun.tags_for_sensor(external_sensor), {RUN_KEY_TAG: run_request.run_key}, ) ) ) if len(existing_runs): check.invariant(len(existing_runs) == 1) run = existing_runs[0] if run.status != PipelineRunStatus.NOT_STARTED: # A run already exists and was launched for this time period, # but the scheduler must have crashed before the tick could be put # into a SUCCESS state context.logger.info( - f"Run {run.run_id} already completed with the execution key " - f"`{run_params.execution_key}` for {external_sensor.name}" + f"Run {run.run_id} already completed with the run key " + f"`{run_request.run_key}` for {external_sensor.name}" ) context.add_state( - JobTickStatus.SUCCESS, run_id=run.run_id, execution_key=run_params.execution_key, + JobTickStatus.SUCCESS, run_id=run.run_id, run_key=run_request.run_key, ) return None else: context.logger.info( - f"Run {run.run_id} already created with the execution key " - f"`{run_params.execution_key}` for {external_sensor.name}" + f"Run {run.run_id} already created with the run key " + f"`{run_request.run_key}` for {external_sensor.name}" ) return run context.logger.info(f"Creating new run for {external_sensor.name}") return _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ) def _create_sensor_run( - context, instance, repo_location, external_sensor, external_pipeline, run_params + context, instance, repo_location, external_sensor, external_pipeline, run_request ): execution_plan_errors = [] execution_plan_snapshot = None try: external_execution_plan = repo_location.get_external_execution_plan( external_pipeline, - run_params.run_config, + run_request.run_config, external_sensor.mode, step_keys_to_execute=None, ) execution_plan_snapshot = external_execution_plan.execution_plan_snapshot except DagsterSubprocessError as e: execution_plan_errors.extend(e.subprocess_error_infos) except Exception as e: # pylint: disable=broad-except execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) pipeline_tags = external_pipeline.tags or {} check_tags(pipeline_tags, "pipeline_tags") tags = merge_dicts( - merge_dicts(pipeline_tags, run_params.tags), PipelineRun.tags_for_sensor(external_sensor) + merge_dicts(pipeline_tags, run_request.tags), PipelineRun.tags_for_sensor(external_sensor) ) - if run_params.execution_key: - tags[EXECUTION_KEY_TAG] = run_params.execution_key + if run_request.run_key: + tags[RUN_KEY_TAG] = run_request.run_key run = instance.create_run( pipeline_name=external_sensor.pipeline_name, run_id=None, - run_config=run_params.run_config, + run_config=run_request.run_config, mode=external_sensor.mode, solids_to_execute=external_pipeline.solids_to_execute, step_keys_to_execute=None, solid_selection=external_sensor.solid_selection, status=( PipelineRunStatus.FAILURE if len(execution_plan_errors) > 0 else PipelineRunStatus.NOT_STARTED ), root_run_id=None, parent_run_id=None, tags=tags, pipeline_snapshot=external_pipeline.pipeline_snapshot, execution_plan_snapshot=execution_plan_snapshot, parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot, external_pipeline_origin=external_pipeline.get_external_origin(), ) if len(execution_plan_errors) > 0: for error in execution_plan_errors: instance.report_engine_event( error.message, run, EngineEventData.engine_error(error), ) instance.report_run_failed(run) context.logger.error( "Failed to fetch execution plan for {sensor_name}: {error_string}".format( sensor_name=external_sensor.name, error_string="\n".join([error.to_string() for error in execution_plan_errors]), ), ) return run diff --git a/python_modules/dagster/dagster/utils/test/schedule_storage.py b/python_modules/dagster/dagster/utils/test/schedule_storage.py index 9196eb499..7b427fd97 100644 --- a/python_modules/dagster/dagster/utils/test/schedule_storage.py +++ b/python_modules/dagster/dagster/utils/test/schedule_storage.py @@ -1,512 +1,500 @@ import sys import time import pytest from dagster import DagsterInvariantViolationError from dagster.core.host_representation import ( ExternalRepositoryOrigin, ManagedGrpcPythonEnvRepositoryLocationOrigin, ) from dagster.core.scheduler.job import ( JobState, JobStatus, JobTickData, JobTickStatus, JobType, ScheduleJobData, ) from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.seven import get_current_datetime_in_utc from dagster.utils.error import SerializableErrorInfo class TestScheduleStorage: """ You can extend this class to easily run these set of tests on any schedule storage. When extending, you simply need to override the `schedule_storage` fixture and return your implementation of `ScheduleStorage`. For example: ``` TestScheduleStorage.__test__ = False class TestMyStorageImplementation(TestScheduleStorage): __test__ = True @pytest.fixture(scope='function', name='storage') def schedule_storage(self): # pylint: disable=arguments-differ return MyStorageImplementation() ``` """ @pytest.fixture(name="storage", params=[]) def schedule_storage(self, request): with request.param() as s: yield s @staticmethod def fake_repo_target(): return ExternalRepositoryOrigin( ManagedGrpcPythonEnvRepositoryLocationOrigin( LoadableTargetOrigin( executable_path=sys.executable, module_name="fake", attribute="fake" ), ), "fake_repo_name", ) @classmethod def build_schedule( cls, schedule_name, cron_schedule, status=JobStatus.STOPPED, ): return JobState( cls.fake_repo_target().get_job_origin(schedule_name), JobType.SCHEDULE, status, ScheduleJobData(cron_schedule, start_timestamp=None), ) @classmethod def build_sensor(cls, sensor_name, status=JobStatus.STOPPED): external_job_origin = cls.fake_repo_target().get_job_origin(sensor_name) return JobState(external_job_origin, JobType.SENSOR, status) def test_basic_schedule_storage(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") storage.add_job_state(schedule) schedules = storage.all_stored_job_state( self.fake_repo_target().get_id(), JobType.SCHEDULE, ) assert len(schedules) == 1 schedule = schedules[0] assert schedule.job_name == "my_schedule" assert schedule.job_specific_data.cron_schedule == "* * * * *" assert schedule.job_specific_data.start_timestamp == None def test_add_multiple_schedules(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") schedule_2 = self.build_schedule("my_schedule_2", "* * * * *") schedule_3 = self.build_schedule("my_schedule_3", "* * * * *") storage.add_job_state(schedule) storage.add_job_state(schedule_2) storage.add_job_state(schedule_3) schedules = storage.all_stored_job_state(self.fake_repo_target().get_id(), JobType.SCHEDULE) assert len(schedules) == 3 assert any(s.job_name == "my_schedule" for s in schedules) assert any(s.job_name == "my_schedule_2" for s in schedules) assert any(s.job_name == "my_schedule_3" for s in schedules) def test_get_schedule_state(self, storage): assert storage state = self.build_schedule("my_schedule", "* * * * *") storage.add_job_state(state) schedule = storage.get_job_state(state.job_origin_id) assert schedule.job_name == "my_schedule" assert schedule.job_specific_data.start_timestamp == None def test_get_schedule_state_not_found(self, storage): assert storage storage.add_job_state(self.build_schedule("my_schedule", "* * * * *")) schedule = storage.get_job_state("fake_id") assert schedule is None def test_update_schedule(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") storage.add_job_state(schedule) now_time = get_current_datetime_in_utc().timestamp() new_schedule = schedule.with_status(JobStatus.RUNNING).with_data( ScheduleJobData( cron_schedule=schedule.job_specific_data.cron_schedule, start_timestamp=now_time, ) ) storage.update_job_state(new_schedule) schedules = storage.all_stored_job_state(self.fake_repo_target().get_id(), JobType.SCHEDULE) assert len(schedules) == 1 schedule = schedules[0] assert schedule.job_name == "my_schedule" assert schedule.status == JobStatus.RUNNING assert schedule.job_specific_data.start_timestamp == now_time stopped_schedule = schedule.with_status(JobStatus.STOPPED).with_data( ScheduleJobData(schedule.job_specific_data.cron_schedule) ) storage.update_job_state(stopped_schedule) schedules = storage.all_stored_job_state(self.fake_repo_target().get_id(), JobType.SCHEDULE) assert len(schedules) == 1 schedule = schedules[0] assert schedule.job_name == "my_schedule" assert schedule.status == JobStatus.STOPPED assert schedule.job_specific_data.start_timestamp == None def test_update_schedule_not_found(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") with pytest.raises(DagsterInvariantViolationError): storage.update_job_state(schedule) def test_delete_schedule_state(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") storage.add_job_state(schedule) storage.delete_job_state(schedule.job_origin_id) schedules = storage.all_stored_job_state(self.fake_repo_target().get_id(), JobType.SCHEDULE) assert len(schedules) == 0 def test_delete_schedule_not_found(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") with pytest.raises(DagsterInvariantViolationError): storage.delete_job_state(schedule.job_origin_id) def test_add_schedule_with_same_name(self, storage): assert storage schedule = self.build_schedule("my_schedule", "* * * * *") storage.add_job_state(schedule) with pytest.raises(DagsterInvariantViolationError): storage.add_job_state(schedule) def build_tick(self, current_time, status=JobTickStatus.STARTED, run_id=None, error=None): return JobTickData( "my_schedule", "my_schedule", JobType.SCHEDULE, status, current_time, run_id, error ) def test_create_tick(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_tick(current_time)) ticks = storage.get_job_ticks("my_schedule") assert len(ticks) == 1 tick = ticks[0] assert tick.job_name == "my_schedule" assert tick.timestamp == current_time assert tick.status == JobTickStatus.STARTED assert tick.run_id == None assert tick.error == None def test_update_tick_to_success(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_tick(current_time)) updated_tick = tick.with_status(JobTickStatus.SUCCESS, run_id="1234") assert updated_tick.status == JobTickStatus.SUCCESS storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_schedule") assert len(ticks) == 1 tick = ticks[0] assert tick.job_name == "my_schedule" assert tick.timestamp == current_time assert tick.status == JobTickStatus.SUCCESS assert tick.run_id == "1234" assert tick.error == None def test_update_tick_to_skip(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_tick(current_time)) updated_tick = tick.with_status(JobTickStatus.SKIPPED) assert updated_tick.status == JobTickStatus.SKIPPED storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_schedule") assert len(ticks) == 1 tick = ticks[0] assert tick.job_name == "my_schedule" assert tick.timestamp == current_time assert tick.status == JobTickStatus.SKIPPED assert tick.run_id == None assert tick.error == None def test_update_tick_to_failure(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_tick(current_time)) updated_tick = tick.with_status( JobTickStatus.FAILURE, error=SerializableErrorInfo(message="Error", stack=[], cls_name="TestError"), ) assert updated_tick.status == JobTickStatus.FAILURE storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_schedule") assert len(ticks) == 1 tick = ticks[0] assert tick.tick_id == 1 assert tick.job_name == "my_schedule" assert tick.timestamp == current_time assert tick.status == JobTickStatus.FAILURE assert tick.run_id == None assert tick.error == SerializableErrorInfo(message="Error", stack=[], cls_name="TestError") def test_get_tick_stats(self, storage): assert storage current_time = time.time() error = SerializableErrorInfo(message="Error", stack=[], cls_name="TestError") # Create ticks for x in range(2): storage.create_job_tick(self.build_tick(current_time)) for x in range(3): storage.create_job_tick( self.build_tick(current_time, JobTickStatus.SUCCESS, run_id=str(x)), ) for x in range(4): storage.create_job_tick(self.build_tick(current_time, JobTickStatus.SKIPPED),) for x in range(5): storage.create_job_tick( self.build_tick(current_time, JobTickStatus.FAILURE, error=error), ) stats = storage.get_job_tick_stats("my_schedule") assert stats.ticks_started == 2 assert stats.ticks_succeeded == 3 assert stats.ticks_skipped == 4 assert stats.ticks_failed == 5 def test_basic_job_storage(self, storage): assert storage job = self.build_sensor("my_sensor") storage.add_job_state(job) jobs = storage.all_stored_job_state(self.fake_repo_target().get_id()) assert len(jobs) == 1 job = jobs[0] assert job.job_name == "my_sensor" def test_add_multiple_jobs(self, storage): assert storage job = self.build_sensor("my_sensor") job_2 = self.build_sensor("my_sensor_2") job_3 = self.build_sensor("my_sensor_3") storage.add_job_state(job) storage.add_job_state(job_2) storage.add_job_state(job_3) jobs = storage.all_stored_job_state(self.fake_repo_target().get_id()) assert len(jobs) == 3 assert any(s.job_name == "my_sensor" for s in jobs) assert any(s.job_name == "my_sensor_2" for s in jobs) assert any(s.job_name == "my_sensor_3" for s in jobs) def test_get_job_state(self, storage): assert storage state = self.build_sensor("my_sensor") storage.add_job_state(state) job = storage.get_job_state(state.job_origin_id) assert job.job_name == "my_sensor" def test_get_job_state_not_found(self, storage): assert storage storage.add_job_state(self.build_sensor("my_sensor")) job_state = storage.get_job_state("fake_id") assert job_state is None def test_update_job(self, storage): assert storage job = self.build_sensor("my_sensor") storage.add_job_state(job) new_job = job.with_status(JobStatus.RUNNING) storage.update_job_state(new_job) jobs = storage.all_stored_job_state(self.fake_repo_target().get_id()) assert len(jobs) == 1 job = jobs[0] assert job.job_name == "my_sensor" assert job.status == JobStatus.RUNNING stopped_job = job.with_status(JobStatus.STOPPED) storage.update_job_state(stopped_job) jobs = storage.all_stored_job_state(self.fake_repo_target().get_id()) assert len(jobs) == 1 job = jobs[0] assert job.job_name == "my_sensor" assert job.status == JobStatus.STOPPED def test_update_job_not_found(self, storage): assert storage job = self.build_sensor("my_sensor") with pytest.raises(DagsterInvariantViolationError): storage.update_job_state(job) def test_delete_job_state(self, storage): assert storage job = self.build_sensor("my_sensor") storage.add_job_state(job) storage.delete_job_state(job.job_origin_id) jobs = storage.all_stored_job_state(self.fake_repo_target().get_id()) assert len(jobs) == 0 def test_delete_job_not_found(self, storage): assert storage job = self.build_sensor("my_sensor") with pytest.raises(DagsterInvariantViolationError): storage.delete_job_state(job.job_origin_id) def test_add_job_with_same_name(self, storage): assert storage job = self.build_sensor("my_sensor") storage.add_job_state(job) with pytest.raises(DagsterInvariantViolationError): storage.add_job_state(job) def build_job_tick_data( - self, - current_time, - status=JobTickStatus.STARTED, - run_id=None, - error=None, - execution_key=None, + self, current_time, status=JobTickStatus.STARTED, run_id=None, error=None, run_key=None, ): return JobTickData( - "my_sensor", - "my_sensor", - JobType.SENSOR, - status, - current_time, - run_id, - error, - execution_key, + "my_sensor", "my_sensor", JobType.SENSOR, status, current_time, run_id, error, run_key, ) def test_create_job_tick(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_job_tick_data(current_time)) assert tick.tick_id == 1 ticks = storage.get_job_ticks("my_sensor") assert len(ticks) == 1 tick = ticks[0] assert tick.tick_id == 1 assert tick.job_name == "my_sensor" assert tick.timestamp == current_time assert tick.status == JobTickStatus.STARTED assert tick.run_id == None assert tick.error == None def test_update_job_tick_to_success(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_job_tick_data(current_time)) updated_tick = tick.with_status(JobTickStatus.SUCCESS, run_id="1234") assert updated_tick.status == JobTickStatus.SUCCESS storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_sensor") assert len(ticks) == 1 tick = ticks[0] assert tick.tick_id == 1 assert tick.job_name == "my_sensor" assert tick.timestamp == current_time assert tick.status == JobTickStatus.SUCCESS assert tick.run_id == "1234" assert tick.error == None def test_update_job_tick_to_skip(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_job_tick_data(current_time)) updated_tick = tick.with_status(JobTickStatus.SKIPPED) assert updated_tick.status == JobTickStatus.SKIPPED storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_sensor") assert len(ticks) == 1 tick = ticks[0] assert tick.tick_id == 1 assert tick.job_name == "my_sensor" assert tick.timestamp == current_time assert tick.status == JobTickStatus.SKIPPED assert tick.run_id == None assert tick.error == None def test_update_job_tick_to_failure(self, storage): assert storage current_time = time.time() tick = storage.create_job_tick(self.build_job_tick_data(current_time)) error = SerializableErrorInfo(message="Error", stack=[], cls_name="TestError") updated_tick = tick.with_status(JobTickStatus.FAILURE, error=error) assert updated_tick.status == JobTickStatus.FAILURE storage.update_job_tick(updated_tick) ticks = storage.get_job_ticks("my_sensor") assert len(ticks) == 1 tick = ticks[0] assert tick.tick_id == 1 assert tick.job_name == "my_sensor" assert tick.timestamp == current_time assert tick.status == JobTickStatus.FAILURE assert tick.run_id == None assert tick.error == error diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py index 412880051..8756f75d5 100644 --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -1,158 +1,158 @@ import string from dagster import ( InputDefinition, Int, OutputDefinition, PartitionSetDefinition, ScheduleDefinition, lambda_solid, pipeline, repository, solid, usable_as_dagster_type, ) from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest @lambda_solid def do_something(): return 1 @lambda_solid def do_input(x): return x @pipeline(name="foo") def foo_pipeline(): do_input(do_something()) @pipeline(name="baz", description="Not much tbh") def baz_pipeline(): do_input() def define_foo_pipeline(): return foo_pipeline @pipeline(name="bar") def bar_pipeline(): @usable_as_dagster_type(name="InputTypeWithoutHydration") class InputTypeWithoutHydration(int): pass @solid(output_defs=[OutputDefinition(InputTypeWithoutHydration)]) def one(_): return 1 @solid( input_defs=[InputDefinition("some_input", InputTypeWithoutHydration)], output_defs=[OutputDefinition(Int)], ) def fail_subset(_, some_input): return some_input return fail_subset(one()) def define_bar_schedules(): return { "foo_schedule": ScheduleDefinition( "foo_schedule", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config={"fizz": "buzz"}, ), "foo_schedule_never_execute": ScheduleDefinition( "foo_schedule_never_execute", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config={"fizz": "buzz"}, should_execute=lambda _context: False, ), "foo_schedule_echo_time": ScheduleDefinition( "foo_schedule_echo_time", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config_fn=lambda context: { "passed_in_time": context.scheduled_execution_time.isoformat() if context.scheduled_execution_time else "" }, ), } def error_partition_fn(): raise Exception("womp womp") def error_partition_config_fn(): raise Exception("womp womp") def error_partition_tags_fn(_partition): raise Exception("womp womp") def define_baz_partitions(): return { "baz_partitions": PartitionSetDefinition( name="baz_partitions", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=lambda partition: { "solids": {"do_input": {"inputs": {"x": {"value": partition.value}}}} }, tags_fn_for_partition=lambda _partition: {"foo": "bar"}, ), "error_partitions": PartitionSetDefinition( name="error_partitions", pipeline_name="baz", partition_fn=error_partition_fn, run_config_fn_for_partition=lambda partition: {}, ), "error_partition_config": PartitionSetDefinition( name="error_partition_config", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=error_partition_config_fn, ), "error_partition_tags": PartitionSetDefinition( name="error_partition_tags", pipeline_name="baz", partition_fn=lambda: string.ascii_lowercase, run_config_fn_for_partition=lambda partition: {}, tags_fn_for_partition=error_partition_tags_fn, ), } @sensor(pipeline_name="foo_pipeline") def sensor_foo(_): - yield SensorRunParams(execution_key=None, run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) - yield SensorRunParams(execution_key=None, run_config={"foo": "FOO"}) + yield RunRequest(run_key=None, run_config={"foo": "FOO"}, tags={"foo": "foo_tag"}) + yield RunRequest(run_key=None, run_config={"foo": "FOO"}) @sensor(pipeline_name="foo_pipeline") def sensor_error(_): raise Exception("womp womp") @repository def bar_repo(): return { "pipelines": { "foo": define_foo_pipeline, "bar": lambda: bar_pipeline, "baz": lambda: baz_pipeline, }, "schedules": define_bar_schedules(), "partition_sets": define_baz_partitions(), "jobs": {"sensor_foo": sensor_foo, "sensor_error": lambda: sensor_error}, } diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py index a8d86aa26..e2d4a2cfa 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -1,31 +1,31 @@ from dagster.api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc from dagster.core.host_representation.external_data import ( ExternalSensorExecutionData, ExternalSensorExecutionErrorData, ) from dagster.core.test_utils import instance_for_test from .utils import get_bar_repo_handle def test_external_sensor_grpc(): with get_bar_repo_handle() as repository_handle: with instance_for_test() as instance: result = sync_get_external_sensor_execution_data_ephemeral_grpc( instance, repository_handle, "sensor_foo", None ) assert isinstance(result, ExternalSensorExecutionData) - assert len(result.run_params) == 2 - run_params = result.run_params[0] - assert run_params.run_config == {"foo": "FOO"} - assert run_params.tags == {"foo": "foo_tag"} + assert len(result.run_requests) == 2 + run_request = result.run_requests[0] + assert run_request.run_config == {"foo": "FOO"} + assert run_request.tags == {"foo": "foo_tag"} def test_external_sensor_error(): with get_bar_repo_handle() as repository_handle: with instance_for_test() as instance: result = sync_get_external_sensor_execution_data_ephemeral_grpc( instance, repository_handle, "sensor_error", None ) assert isinstance(result, ExternalSensorExecutionErrorData) assert "womp womp" in result.error.to_string() diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py index b91d4f022..184ca0a6c 100644 --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py @@ -1,612 +1,612 @@ from __future__ import print_function import json import os import string import sys from contextlib import contextmanager import mock import pytest from click.testing import CliRunner from dagster import ( PartitionSetDefinition, PresetDefinition, ScheduleDefinition, lambda_solid, pipeline, repository, seven, solid, ) from dagster.cli import ENV_PREFIX, cli from dagster.cli.pipeline import pipeline_execute_command from dagster.cli.run import run_list_command, run_wipe_command from dagster.core.definitions.decorators.sensor import sensor -from dagster.core.definitions.sensor import SensorRunParams +from dagster.core.definitions.sensor import RunRequest from dagster.core.test_utils import instance_for_test, instance_for_test_tempdir from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.grpc.server import GrpcServerProcess from dagster.utils import file_relative_path, merge_dicts from dagster.version import __version__ def no_print(_): return None @lambda_solid def do_something(): return 1 @lambda_solid def do_input(x): return x @pipeline( name="foo", preset_defs=[PresetDefinition(name="test", tags={"foo": "bar"}),], ) def foo_pipeline(): do_input(do_something()) def define_foo_pipeline(): return foo_pipeline @pipeline(name="baz", description="Not much tbh") def baz_pipeline(): do_input() def not_a_repo_or_pipeline_fn(): return "kdjfkjdf" not_a_repo_or_pipeline = 123 @pipeline def partitioned_scheduled_pipeline(): do_something() def define_bar_schedules(): partition_set = PartitionSetDefinition( name="scheduled_partitions", pipeline_name="partitioned_scheduled_pipeline", partition_fn=lambda: string.digits, ) return { "foo_schedule": ScheduleDefinition( "foo_schedule", cron_schedule="* * * * *", pipeline_name="test_pipeline", run_config={}, ), "partitioned_schedule": partition_set.create_schedule_definition( schedule_name="partitioned_schedule", cron_schedule="* * * * *" ), } def define_bar_partitions(): def error_name(): raise Exception("womp womp") def error_config(_): raise Exception("womp womp") return { "baz_partitions": PartitionSetDefinition( name="baz_partitions", pipeline_name="baz", partition_fn=lambda: string.digits, run_config_fn_for_partition=lambda partition: { "solids": {"do_input": {"inputs": {"x": {"value": partition.value}}}} }, ), "error_name_partitions": PartitionSetDefinition( name="error_name_partitions", pipeline_name="baz", partition_fn=error_name, ), "error_config_partitions": PartitionSetDefinition( name="error_config_partitions", pipeline_name="baz", partition_fn=error_config, ), } def define_bar_sensors(): @sensor(pipeline_name="baz") def foo_sensor(context): run_config = {"foo": "FOO"} if context.last_completion_time: run_config["since"] = context.last_completion_time - return SensorRunParams(execution_key=None, run_config=run_config) + return RunRequest(run_key=None, run_config=run_config) return {"foo_sensor": foo_sensor} @repository def bar(): return { "pipelines": { "foo": foo_pipeline, "baz": baz_pipeline, "partitioned_scheduled_pipeline": partitioned_scheduled_pipeline, }, "schedules": define_bar_schedules(), "partition_sets": define_bar_partitions(), "jobs": define_bar_sensors(), } @solid def spew(context): context.log.info("HELLO WORLD") @solid def fail(context): raise Exception("I AM SUPPOSED TO FAIL") @pipeline def stdout_pipeline(): spew() @pipeline def stderr_pipeline(): fail() @contextmanager def _default_cli_test_instance_tempdir(temp_dir, overrides=None): default_overrides = { "run_launcher": {"module": "dagster.core.test_utils", "class": "MockedRunLauncher",} } with instance_for_test_tempdir( temp_dir, overrides=merge_dicts(default_overrides, (overrides if overrides else {})) ) as instance: with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance yield instance @contextmanager def default_cli_test_instance(overrides=None): with seven.TemporaryDirectory() as temp_dir: with _default_cli_test_instance_tempdir(temp_dir, overrides) as instance: yield instance @contextmanager def args_with_instance(gen_instance, *args): with gen_instance as instance: yield args + (instance,) def args_with_default_cli_test_instance(*args): return args_with_instance(default_cli_test_instance(), *args) @contextmanager def grpc_server_bar_kwargs(pipeline_name=None): server_process = GrpcServerProcess( loadable_target_origin=LoadableTargetOrigin( executable_path=sys.executable, python_file=file_relative_path(__file__, "test_cli_commands.py"), attribute="bar", ), ) with server_process.create_ephemeral_client() as client: args = {"grpc_host": client.host} if pipeline_name: args["pipeline"] = "foo" if client.port: args["grpc_port"] = client.port if client.socket: args["grpc_socket"] = client.socket yield args server_process.wait() @contextmanager def python_bar_cli_args(pipeline_name=None): args = [ "-m", "dagster_tests.cli_tests.command_tests.test_cli_commands", "-a", "bar", ] if pipeline_name: args.append("-p") args.append(pipeline_name) yield args @contextmanager def grpc_server_bar_cli_args(pipeline_name=None): server_process = GrpcServerProcess( loadable_target_origin=LoadableTargetOrigin( executable_path=sys.executable, python_file=file_relative_path(__file__, "test_cli_commands.py"), attribute="bar", ), ) with server_process.create_ephemeral_client() as client: args = ["--grpc-host", client.host] if client.port: args.append("--grpc-port") args.append(client.port) if client.socket: args.append("--grpc-socket") args.append(client.socket) if pipeline_name: args.append("--pipeline") args.append(pipeline_name) yield args server_process.wait() @contextmanager def grpc_server_bar_pipeline_args(): with default_cli_test_instance() as instance: with grpc_server_bar_kwargs(pipeline_name="foo") as kwargs: yield kwargs, instance # This iterates over a list of contextmanagers that can be used to contruct # (cli_args, instance tuples) def launch_command_contexts(): for pipeline_target_args in valid_external_pipeline_target_args(): yield args_with_default_cli_test_instance(pipeline_target_args) yield pytest.param(grpc_server_bar_pipeline_args()) def pipeline_python_origin_contexts(): return [ args_with_default_cli_test_instance(pipeline_target_args) for pipeline_target_args in valid_pipeline_python_origin_target_args() ] @contextmanager def scheduler_instance(overrides=None): with seven.TemporaryDirectory() as temp_dir: with _default_cli_test_instance_tempdir( temp_dir, overrides=merge_dicts( { "scheduler": { "module": "dagster.utils.test", "class": "FilesystemTestScheduler", "config": {"base_dir": temp_dir}, } }, overrides if overrides else {}, ), ) as instance: yield instance @contextmanager def grpc_server_scheduler_cli_args(overrides=None): with scheduler_instance(overrides) as instance: with grpc_server_bar_cli_args() as args: yield args, instance # Returns a list of contextmanagers that can be used to contruct # (cli_args, instance) tuples for schedule calls def schedule_command_contexts(): return [ args_with_instance( scheduler_instance(), ["-w", file_relative_path(__file__, "workspace.yaml")] ), grpc_server_scheduler_cli_args(), ] def sensor_command_contexts(): return [ args_with_instance( scheduler_instance(), ["-w", file_relative_path(__file__, "workspace.yaml")], ), grpc_server_scheduler_cli_args(), ] # This iterates over a list of contextmanagers that can be used to contruct # (cli_args, instance) tuples for backfill calls def backfill_command_contexts(): repo_args = { "noprompt": True, "workspace": (file_relative_path(__file__, "repository_file.yaml"),), } return [ args_with_instance(default_cli_test_instance(), repo_args), grpc_server_backfill_args(), ] @contextmanager def grpc_server_backfill_args(): with default_cli_test_instance() as instance: with grpc_server_bar_kwargs() as args: yield merge_dicts(args, {"noprompt": True}), instance def non_existant_python_origin_target_args(): return { "workspace": None, "pipeline": "foo", "python_file": file_relative_path(__file__, "made_up_file.py"), "module_name": None, "attribute": "bar", } def valid_pipeline_python_origin_target_args(): return [ { "workspace": None, "pipeline": "foo", "python_file": file_relative_path(__file__, "test_cli_commands.py"), "module_name": None, "attribute": "bar", }, { "workspace": None, "pipeline": "foo", "python_file": file_relative_path(__file__, "test_cli_commands.py"), "module_name": None, "attribute": "bar", "working_directory": os.path.dirname(__file__), }, { "workspace": None, "pipeline": "foo", "python_file": None, "module_name": "dagster_tests.cli_tests.command_tests.test_cli_commands", "attribute": "bar", }, { "workspace": None, "pipeline": "foo", "python_file": None, "package_name": "dagster_tests.cli_tests.command_tests.test_cli_commands", "attribute": "bar", }, { "workspace": None, "pipeline": None, "python_file": None, "module_name": "dagster_tests.cli_tests.command_tests.test_cli_commands", "attribute": "foo_pipeline", }, { "workspace": None, "pipeline": None, "python_file": None, "package_name": "dagster_tests.cli_tests.command_tests.test_cli_commands", "attribute": "foo_pipeline", }, { "workspace": None, "pipeline": None, "python_file": file_relative_path(__file__, "test_cli_commands.py"), "module_name": None, "attribute": "define_foo_pipeline", }, { "workspace": None, "pipeline": None, "python_file": file_relative_path(__file__, "test_cli_commands.py"), "module_name": None, "attribute": "define_foo_pipeline", "working_directory": os.path.dirname(__file__), }, { "workspace": None, "pipeline": None, "python_file": file_relative_path(__file__, "test_cli_commands.py"), "module_name": None, "attribute": "foo_pipeline", }, ] def valid_external_pipeline_target_args(): return [ { "workspace": (file_relative_path(__file__, "repository_file.yaml"),), "pipeline": "foo", "python_file": None, "module_name": None, "attribute": None, }, { "workspace": (file_relative_path(__file__, "repository_module.yaml"),), "pipeline": "foo", "python_file": None, "module_name": None, "attribute": None, }, ] + [args for args in valid_pipeline_python_origin_target_args()] def valid_pipeline_python_origin_target_cli_args(): return [ ["-f", file_relative_path(__file__, "test_cli_commands.py"), "-a", "bar", "-p", "foo"], [ "-f", file_relative_path(__file__, "test_cli_commands.py"), "-d", os.path.dirname(__file__), "-a", "bar", "-p", "foo", ], [ "-m", "dagster_tests.cli_tests.command_tests.test_cli_commands", "-a", "bar", "-p", "foo", ], ["-m", "dagster_tests.cli_tests.command_tests.test_cli_commands", "-a", "foo_pipeline"], ["-f", file_relative_path(__file__, "test_cli_commands.py"), "-a", "define_foo_pipeline",], [ "-f", file_relative_path(__file__, "test_cli_commands.py"), "-d", os.path.dirname(__file__), "-a", "define_foo_pipeline", ], ] def valid_external_pipeline_target_cli_args_no_preset(): return [ ["-w", file_relative_path(__file__, "repository_file.yaml"), "-p", "foo"], ["-w", file_relative_path(__file__, "repository_module.yaml"), "-p", "foo"], ["-w", file_relative_path(__file__, "workspace.yaml"), "-p", "foo"], [ "-w", file_relative_path(__file__, "override.yaml"), "-w", file_relative_path(__file__, "workspace.yaml"), "-p", "foo", ], ] + [args for args in valid_pipeline_python_origin_target_cli_args()] def valid_external_pipeline_target_cli_args_with_preset(): run_config = {"storage": {"filesystem": {"config": {"base_dir": "/tmp"}}}} return valid_external_pipeline_target_cli_args_no_preset() + [ [ "-f", file_relative_path(__file__, "test_cli_commands.py"), "-d", os.path.dirname(__file__), "-a", "define_foo_pipeline", "--preset", "test", ], [ "-f", file_relative_path(__file__, "test_cli_commands.py"), "-d", os.path.dirname(__file__), "-a", "define_foo_pipeline", "--config-json", json.dumps(run_config), ], ] def test_run_list(): with instance_for_test(): runner = CliRunner() result = runner.invoke(run_list_command) assert result.exit_code == 0 def test_run_wipe_correct_delete_message(): with instance_for_test(): runner = CliRunner() result = runner.invoke(run_wipe_command, input="DELETE\n") assert "Deleted all run history and event logs" in result.output assert result.exit_code == 0 def test_run_wipe_incorrect_delete_message(): with instance_for_test(): runner = CliRunner() result = runner.invoke(run_wipe_command, input="WRONG\n") assert "Exiting without deleting all run history and event logs" in result.output assert result.exit_code == 0 def test_run_list_limit(): with instance_for_test(): runner = CliRunner() runner_pipeline_execute( runner, [ "-f", file_relative_path(__file__, "../../general_tests/test_repository.py"), "-a", "dagster_test_repository", "--preset", "add", "-p", "multi_mode_with_resources", # pipeline name ], ) runner_pipeline_execute( runner, [ "-f", file_relative_path(__file__, "../../general_tests/test_repository.py"), "-a", "dagster_test_repository", "--preset", "add", "-p", "multi_mode_with_resources", # pipeline name ], ) # Should only shows one run because of the limit argument result = runner.invoke(run_list_command, args="--limit 1") assert result.exit_code == 0 assert result.output.count("Run: ") == 1 assert result.output.count("Pipeline: multi_mode_with_resources") == 1 # Shows two runs because of the limit argument is now 2 two_results = runner.invoke(run_list_command, args="--limit 2") assert two_results.exit_code == 0 assert two_results.output.count("Run: ") == 2 assert two_results.output.count("Pipeline: multi_mode_with_resources") == 2 # Should only shows two runs although the limit argument is 3 because there are only 2 runs shows_two_results = runner.invoke(run_list_command, args="--limit 3") assert shows_two_results.exit_code == 0 assert shows_two_results.output.count("Run: ") == 2 assert shows_two_results.output.count("Pipeline: multi_mode_with_resources") == 2 def runner_pipeline_execute(runner, cli_args): result = runner.invoke(pipeline_execute_command, cli_args) if result.exit_code != 0: # CliRunner captures stdout so printing it out here raise Exception( ( "dagster pipeline execute commands with cli_args {cli_args} " 'returned exit_code {exit_code} with stdout:\n"{stdout}" and ' '\nresult as string: "{result}"' ).format( cli_args=cli_args, exit_code=result.exit_code, stdout=result.stdout, result=result ) ) return result def test_use_env_vars_for_cli_option(): env_key = "{}_VERSION".format(ENV_PREFIX) runner = CliRunner(env={env_key: "1"}) # use `debug` subcommand to trigger the cli group option flag `--version` # see issue: https://github.com/pallets/click/issues/1694 result = runner.invoke(cli, ["debug"], auto_envvar_prefix=ENV_PREFIX) assert __version__ in result.output assert result.exit_code == 0 diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py index d0b73b0cd..f416fcd0d 100644 --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_sensor_commands.py @@ -1,127 +1,127 @@ from __future__ import print_function import re import click import mock import pytest from click.testing import CliRunner from dagster.cli.sensor import ( check_repo_and_scheduler, sensor_list_command, sensor_preview_command, sensor_start_command, sensor_stop_command, ) from dagster.core.host_representation import ExternalRepository from dagster.core.instance import DagsterInstance from dagster.core.test_utils import environ from .test_cli_commands import sensor_command_contexts @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensors_list(gen_sensor_args): with gen_sensor_args as (cli_args, instance): runner = CliRunner() with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance result = runner.invoke(sensor_list_command, cli_args) assert result.exit_code == 0 assert result.output == "Repository bar\n**************\nSensor: foo_sensor [STOPPED]\n" @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensors_start_and_stop(gen_sensor_args): with gen_sensor_args as (cli_args, instance): with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance runner = CliRunner() result = runner.invoke(sensor_start_command, cli_args + ["foo_sensor"],) assert result.exit_code == 0 assert "Started sensor foo_sensor\n" == result.output result = runner.invoke(sensor_stop_command, cli_args + ["foo_sensor"],) assert result.exit_code == 0 assert "Stopped sensor foo_sensor\n" == result.output @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensors_start_empty(gen_sensor_args): with gen_sensor_args as (cli_args, instance): runner = CliRunner() with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance result = runner.invoke(sensor_start_command, cli_args,) assert result.exit_code == 0 assert "Noop: dagster sensor start was called without any arguments" in result.output @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensors_start_all(gen_sensor_args): with gen_sensor_args as (cli_args, instance): runner = CliRunner() with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance result = runner.invoke(sensor_start_command, cli_args + ["--start-all"],) assert result.exit_code == 0 assert result.output == "Started all sensors for repository bar\n" def test_check_repo_and_sensorr_no_external_sensors(): repository = mock.MagicMock(spec=ExternalRepository) repository.get_external_sensors.return_value = [] instance = mock.MagicMock(spec=DagsterInstance) with pytest.raises(click.UsageError, match="There are no sensors defined for repository"): check_repo_and_scheduler(repository, instance) def test_check_repo_and_scheduler_dagster_home_not_set(): with environ({"DAGSTER_HOME": ""}): repository = mock.MagicMock(spec=ExternalRepository) repository.get_external_sensors.return_value = [mock.MagicMock()] instance = mock.MagicMock(spec=DagsterInstance) with pytest.raises( click.UsageError, match=re.escape("The environment variable $DAGSTER_HOME is not set.") ): check_repo_and_scheduler(repository, instance) @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensor_preview(gen_sensor_args): with gen_sensor_args as (cli_args, instance): runner = CliRunner() with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance result = runner.invoke(sensor_preview_command, cli_args + ["foo_sensor"],) assert result.exit_code == 0 - assert result.output == "Sensor returning run parameters for 1 run(s):\n\nfoo: FOO\n\n" + assert result.output == "Sensor returning run requests for 1 run(s):\n\nfoo: FOO\n\n" @pytest.mark.parametrize("gen_sensor_args", sensor_command_contexts()) def test_sensor_preview_since(gen_sensor_args): with gen_sensor_args as (cli_args, instance): runner = CliRunner() with mock.patch("dagster.core.instance.DagsterInstance.get") as _instance: _instance.return_value = instance result = runner.invoke( sensor_preview_command, cli_args + ["foo_sensor", "--since", 1.1] ) assert result.exit_code == 0 assert ( result.output - == "Sensor returning run parameters for 1 run(s):\n\nfoo: FOO\nsince: 1.1\n\n" + == "Sensor returning run requests for 1 run(s):\n\nfoo: FOO\nsince: 1.1\n\n" ) diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py index 19d3e7d12..dfd34a584 100644 --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py @@ -1,248 +1,248 @@ import pendulum import pytest from dagster.core.definitions.job import JobType from dagster.core.instance import DagsterInstance from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus from dagster.core.storage.pipeline_run import PipelineRunStatus -from dagster.core.storage.tags import EXECUTION_KEY_TAG, SENSOR_NAME_TAG +from dagster.core.storage.tags import RUN_KEY_TAG, SENSOR_NAME_TAG from dagster.core.test_utils import cleanup_test_instance, get_crash_signals from dagster.daemon import get_default_daemon_logger from dagster.scheduler.sensor import execute_sensor_iteration from dagster.seven import IS_WINDOWS, multiprocessing from .test_sensor_run import instance_with_sensors, repos, wait_for_all_runs_to_start def _test_launch_sensor_runs_in_subprocess(instance_ref, execution_datetime, debug_crash_flags): with DagsterInstance.from_ref(instance_ref) as instance: try: with pendulum.test(execution_datetime): execute_sensor_iteration( instance, get_default_daemon_logger("SensorDaemon"), debug_crash_flags=debug_crash_flags, ) finally: cleanup_test_instance(instance) @pytest.mark.skipif( IS_WINDOWS, reason="Windows keeps resources open after termination in a flaky way" ) @pytest.mark.parametrize("external_repo_context", repos()) @pytest.mark.parametrize("crash_location", ["TICK_CREATED", "TICK_HELD"]) @pytest.mark.parametrize("crash_signal", get_crash_signals()) def test_failure_before_run_created(external_repo_context, crash_location, crash_signal, capfd): frozen_datetime = pendulum.datetime( year=2019, month=2, day=28, hour=0, minute=0, second=1, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(frozen_datetime): external_sensor = external_repo.get_external_sensor("simple_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) # create a tick launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime, None], ) launch_process.start() launch_process.join(timeout=60) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.SKIPPED captured = capfd.readouterr() assert ( captured.out.replace("\r\n", "\n") == """2019-02-27 18:00:01 - SensorDaemon - INFO - Checking for new runs for the following sensors: simple_sensor 2019-02-27 18:00:01 - SensorDaemon - INFO - Sensor returned false for simple_sensor, skipping """ ) # create a starting tick, but crash debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime.add(seconds=1), debug_crash_flags], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode != 0 captured = capfd.readouterr() assert ( captured.out.replace("\r\n", "\n") == """2019-02-27 18:00:02 - SensorDaemon - INFO - Checking for new runs for the following sensors: simple_sensor """ ) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) # reuses the skipped tick assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.STARTED assert not int(ticks[0].timestamp) % 2 # skip condition for simple_sensor assert instance.get_runs_count() == 0 # create another tick, but ensure that the last evaluation time used is from the first, # successful tick rather than the failed tick launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime.add(seconds=2), None], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode == 0 wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] captured = capfd.readouterr() assert ( captured.out.replace("\r\n", "\n") == f"""2019-02-27 18:00:03 - SensorDaemon - INFO - Checking for new runs for the following sensors: simple_sensor 2019-02-27 18:00:03 - SensorDaemon - INFO - Launching run for simple_sensor 2019-02-27 18:00:03 - SensorDaemon - INFO - Completed launch of run {run.run_id} for simple_sensor """ ) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) # reuses the started tick assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.SUCCESS @pytest.mark.skipif( IS_WINDOWS, reason="Windows keeps resources open after termination in a flaky way" ) @pytest.mark.parametrize("external_repo_context", repos()) @pytest.mark.parametrize("crash_location", ["RUN_CREATED"]) @pytest.mark.parametrize("crash_signal", get_crash_signals()) def test_failure_after_run_created_before_run_launched( external_repo_context, crash_location, crash_signal, capfd ): frozen_datetime = pendulum.datetime( year=2019, month=2, day=28, hour=0, minute=0, second=0, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(frozen_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) # create a starting tick, but crash debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime, debug_crash_flags], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode != 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.STARTED assert instance.get_runs_count() == 1 run = instance.get_runs()[0] # Run was created, but hasn't launched yet assert run.status == PipelineRunStatus.NOT_STARTED - assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" - assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" + assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" + assert run.tags.get(RUN_KEY_TAG) == "only_once" # clear output capfd.readouterr() launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode == 0 wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] captured = capfd.readouterr() assert ( - f"Run {run.run_id} already created with the execution key `only_once` for execution_key_sensor" + f"Run {run.run_id} already created with the run key `only_once` for run_key_sensor" in captured.out ) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.SUCCESS @pytest.mark.skipif( IS_WINDOWS, reason="Windows keeps resources open after termination in a flaky way" ) @pytest.mark.parametrize("external_repo_context", repos()) @pytest.mark.parametrize("crash_location", ["RUN_LAUNCHED"]) @pytest.mark.parametrize("crash_signal", get_crash_signals()) def test_failure_after_run_launched(external_repo_context, crash_location, crash_signal, capfd): frozen_datetime = pendulum.datetime( year=2019, month=2, day=28, hour=0, minute=0, second=0, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(frozen_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) # create a run, launch but crash debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime, debug_crash_flags], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode != 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.STARTED assert instance.get_runs_count() == 1 run = instance.get_runs()[0] wait_for_all_runs_to_start(instance) - assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" - assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" + assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" + assert run.tags.get(RUN_KEY_TAG) == "only_once" capfd.readouterr() launch_process = multiprocessing.Process( target=_test_launch_sensor_runs_in_subprocess, args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], ) launch_process.start() launch_process.join(timeout=60) assert launch_process.exitcode == 0 wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] captured = capfd.readouterr() assert ( - f"Run {run.run_id} already completed with the execution key `only_once` for execution_key_sensor" + f"Run {run.run_id} already completed with the run key `only_once` for run_key_sensor" in captured.out ) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 assert ticks[0].status == JobTickStatus.SUCCESS diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py index 6633cb263..765bc7698 100644 --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py @@ -1,316 +1,316 @@ import os import sys import time from contextlib import contextmanager import pendulum import pytest from dagster import pipeline, repository, solid from dagster.core.definitions.decorators.sensor import sensor from dagster.core.definitions.job import JobType -from dagster.core.definitions.sensor import SensorRunParams, SensorSkipData +from dagster.core.definitions.sensor import RunRequest, SkipReason from dagster.core.host_representation import ( ManagedGrpcPythonEnvRepositoryLocationOrigin, RepositoryLocation, RepositoryLocationHandle, ) from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus from dagster.core.storage.pipeline_run import PipelineRunStatus from dagster.core.test_utils import instance_for_test from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.daemon import get_default_daemon_logger from dagster.scheduler.sensor import execute_sensor_iteration @solid def the_solid(_): return 1 @pipeline def the_pipeline(): the_solid() @sensor(pipeline_name="the_pipeline") def simple_sensor(context): if not context.last_completion_time or not int(context.last_completion_time) % 2: - return SensorSkipData() + return SkipReason() - return SensorRunParams(execution_key=None, run_config={}, tags={}) + return RunRequest(run_key=None, run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") def always_on_sensor(_context): - return SensorRunParams(execution_key=None, run_config={}, tags={}) + return RunRequest(run_key=None, run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") -def execution_key_sensor(_context): - return SensorRunParams(execution_key="only_once", run_config={}, tags={}) +def run_key_sensor(_context): + return RunRequest(run_key="only_once", run_config={}, tags={}) @sensor(pipeline_name="the_pipeline") def error_sensor(context): raise Exception("womp womp") @repository def the_repo(): - return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, execution_key_sensor] + return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, run_key_sensor] @contextmanager def instance_with_sensors(external_repo_context, overrides=None): with instance_for_test(overrides) as instance: with external_repo_context() as external_repo: yield (instance, external_repo) @contextmanager def default_repo(): loadable_target_origin = LoadableTargetOrigin( executable_path=sys.executable, python_file=__file__, attribute="the_repo", working_directory=os.getcwd(), ) with RepositoryLocationHandle.create_from_repository_location_origin( ManagedGrpcPythonEnvRepositoryLocationOrigin( loadable_target_origin=loadable_target_origin, location_name="test_location", ) ) as handle: yield RepositoryLocation.from_handle(handle).get_repository("the_repo") def repos(): return [default_repo] def validate_tick( tick, external_sensor, expected_datetime, expected_status, expected_run_id=None, expected_error=None, - expected_execution_key=None, + expected_run_key=None, ): tick_data = tick.job_tick_data assert tick_data.job_origin_id == external_sensor.get_external_origin_id() assert tick_data.job_name == external_sensor.name assert tick_data.job_type == JobType.SENSOR assert tick_data.status == expected_status assert tick_data.timestamp == expected_datetime.timestamp() assert tick_data.run_id == expected_run_id - assert tick_data.execution_key == expected_execution_key + assert tick_data.run_key == expected_run_key if expected_error: assert expected_error in tick_data.error.message def validate_run_started(run, expected_success=True): if expected_success: assert run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.SUCCESS else: assert run.status == PipelineRunStatus.FAILURE def wait_for_all_runs_to_start(instance, timeout=10): start_time = time.time() while True: if time.time() - start_time > timeout: raise Exception("Timed out waiting for runs to start") time.sleep(0.5) not_started_runs = [ run for run in instance.get_runs() if run.status == PipelineRunStatus.NOT_STARTED ] if len(not_started_runs) == 0: break @pytest.mark.parametrize("external_repo_context", repos()) def test_simple_sensor(external_repo_context, capfd): freeze_datetime = pendulum.datetime( year=2019, month=2, day=27, hour=23, minute=59, second=59, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(freeze_datetime): external_sensor = external_repo.get_external_sensor("simple_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 0 execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 validate_tick( ticks[0], external_sensor, freeze_datetime, JobTickStatus.SKIPPED, ) captured = capfd.readouterr() assert ( captured.out == """2019-02-27 17:59:59 - SensorDaemon - INFO - Checking for new runs for the following sensors: simple_sensor 2019-02-27 17:59:59 - SensorDaemon - INFO - Sensor returned false for simple_sensor, skipping """ ) freeze_datetime = freeze_datetime.add(seconds=1) with pendulum.test(freeze_datetime): execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] validate_run_started(run) ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 # reuse the skipped tick expected_datetime = pendulum.datetime(year=2019, month=2, day=28) validate_tick( ticks[0], external_sensor, expected_datetime, JobTickStatus.SUCCESS, run.run_id, ) captured = capfd.readouterr() assert ( captured.out == """2019-02-27 18:00:00 - SensorDaemon - INFO - Checking for new runs for the following sensors: simple_sensor 2019-02-27 18:00:00 - SensorDaemon - INFO - Launching run for simple_sensor 2019-02-27 18:00:00 - SensorDaemon - INFO - Completed launch of run {run_id} for simple_sensor """.format( run_id=run.run_id ) ) @pytest.mark.parametrize("external_repo_context", repos()) def test_error_sensor(external_repo_context, capfd): freeze_datetime = pendulum.datetime( year=2019, month=2, day=27, hour=23, minute=59, second=59, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(freeze_datetime): external_sensor = external_repo.get_external_sensor("error_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 0 execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 validate_tick( ticks[0], external_sensor, freeze_datetime, JobTickStatus.FAILURE, None, "Error occurred during the execution of evaluation_fn for sensor error_sensor", ) captured = capfd.readouterr() assert ("Failed to resolve sensor for error_sensor : ") in captured.out assert ( "Error occurred during the execution of evaluation_fn for sensor error_sensor" ) in captured.out @pytest.mark.parametrize("external_repo_context", repos()) def test_launch_failure(external_repo_context, capfd): freeze_datetime = pendulum.datetime( year=2019, month=2, day=27, hour=23, minute=59, second=59, ).in_tz("US/Central") with instance_with_sensors( external_repo_context, overrides={ "run_launcher": {"module": "dagster.core.test_utils", "class": "ExplodingRunLauncher",}, }, ) as (instance, external_repo): with pendulum.test(freeze_datetime): external_sensor = external_repo.get_external_sensor("always_on_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 0 execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 validate_tick( ticks[0], external_sensor, freeze_datetime, JobTickStatus.SUCCESS, run.run_id ) captured = capfd.readouterr() assert ( "Run {run_id} created successfully but failed to launch.".format(run_id=run.run_id) ) in captured.out @pytest.mark.parametrize("external_repo_context", repos()) def test_launch_once(external_repo_context, capfd): freeze_datetime = pendulum.datetime( year=2019, month=2, day=27, hour=23, minute=59, second=59, ).in_tz("US/Central") with instance_with_sensors(external_repo_context) as (instance, external_repo): with pendulum.test(freeze_datetime): - external_sensor = external_repo.get_external_sensor("execution_key_sensor") + external_sensor = external_repo.get_external_sensor("run_key_sensor") instance.add_job_state( JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) ) assert instance.get_runs_count() == 0 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 0 execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 1 run = instance.get_runs()[0] ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 1 validate_tick( ticks[0], external_sensor, freeze_datetime, JobTickStatus.SUCCESS, expected_run_id=run.run_id, - expected_execution_key="only_once", + expected_run_key="only_once", ) # run again, ensure execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) assert instance.get_runs_count() == 1 ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) assert len(ticks) == 2 validate_tick( ticks[0], external_sensor, freeze_datetime, JobTickStatus.SKIPPED, - expected_execution_key="only_once", + expected_run_key="only_once", ) captured = capfd.readouterr() assert ( - "Found existing run for sensor execution_key_sensor with execution_key `only_once`, skipping." + "Found existing run for sensor run_key_sensor with run_key `only_once`, skipping." in captured.out )