diff --git a/examples/docs_snippets/docs_snippets_tests/overview_tests/schedules_partitions_tests/test_partition_definition.py b/examples/docs_snippets/docs_snippets_tests/overview_tests/schedules_partitions_tests/test_partition_definition.py --- a/examples/docs_snippets/docs_snippets_tests/overview_tests/schedules_partitions_tests/test_partition_definition.py +++ b/examples/docs_snippets/docs_snippets_tests/overview_tests/schedules_partitions_tests/test_partition_definition.py @@ -2,6 +2,7 @@ from docs_snippets.overview.schedules_partitions.pipeline import my_pipeline from dagster import execute_pipeline +from dagster.core.test_utils import instance_for_test def test_pipeline(): @@ -12,7 +13,8 @@ def test_partition_set(): - partitions = day_partition_set.get_partitions() - assert len(partitions) == 7 - for partition in partitions: - assert day_partition_set.run_config_for_partition(partition) + with instance_for_test() as instance: + partitions = day_partition_set.get_partitions(instance) + assert len(partitions) == 7 + for partition in partitions: + assert day_partition_set.run_config_for_partition(partition) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/context.py b/python_modules/dagster-graphql/dagster_graphql/implementation/context.py --- a/python_modules/dagster-graphql/dagster_graphql/implementation/context.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/context.py @@ -106,6 +106,7 @@ return self._repository_locations[ repository_handle.repository_location_handle.location_name ].get_external_partition_config( + instance=self.instance, repository_handle=repository_handle, partition_set_name=partition_set_name, partition_name=partition_name, @@ -115,6 +116,7 @@ return self._repository_locations[ repository_handle.repository_location_handle.location_name ].get_external_partition_tags( + instance=self.instance, repository_handle=repository_handle, partition_set_name=partition_set_name, partition_name=partition_name, @@ -123,7 +125,7 @@ def get_external_partition_names(self, repository_handle, partition_set_name): return self._repository_locations[ repository_handle.repository_location_handle.location_name - ].get_external_partition_names(repository_handle, partition_set_name) + ].get_external_partition_names(self.instance, repository_handle, partition_set_name) def get_external_schedule_execution_data(self, repository_handle, schedule_name): return self._repository_locations[ @@ -143,6 +145,7 @@ return self._repository_locations[ repository_handle.repository_location_handle.location_name ].get_external_partition_set_execution_param_data( + instance=self.instance, repository_handle=repository_handle, partition_set_name=partition_set_name, partition_names=partition_names, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py @@ -742,6 +742,16 @@ """ return _variants_with_mark(GraphQLContextVariant.all_variants(), pytest.mark.readonly) + @staticmethod + def all_persistent_readonly_variants(): + """ + Return all readonly variants with instances that can be persisted and reloaded. + """ + return _variants_without_marks( + _variants_with_mark(GraphQLContextVariant.all_variants(), pytest.mark.readonly), + [Marks.in_memory_instance], + ) + def _variants_with_mark(variants, mark): def _yield_all(): @@ -860,6 +870,10 @@ context_variants=GraphQLContextVariant.all_readonly_variants() ) +PersistentReadonlyGraphQLContextTestMatrix = make_graphql_context_test_suite( + context_variants=GraphQLContextVariant.all_persistent_readonly_variants() +) + ExecutingGraphQLContextTestMatrix = make_graphql_context_test_suite( context_variants=GraphQLContextVariant.all_executing_variants() ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -8,8 +8,8 @@ from dagster.core.test_utils import environ from .graphql_context_test_suite import ( - ExecutingGraphQLContextTestMatrix, GraphQLContextVariant, + OutOfProcessExecutingGraphQLContextTestMatrix, make_graphql_context_test_suite, ) from .utils import ( @@ -20,7 +20,7 @@ ) -class TestPartitionBackfill(ExecutingGraphQLContextTestMatrix): +class TestPartitionBackfill(OutOfProcessExecutingGraphQLContextTestMatrix): def test_launch_full_pipeline_backfill(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_sets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_sets.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_sets.py @@ -8,8 +8,8 @@ ) from .graphql_context_test_suite import ( - ExecutingGraphQLContextTestMatrix, - ReadonlyGraphQLContextTestMatrix, + OutOfProcessExecutingGraphQLContextTestMatrix, + PersistentReadonlyGraphQLContextTestMatrix, ) GET_PARTITION_SETS_FOR_PIPELINE_QUERY = """ @@ -111,7 +111,7 @@ """ -class TestPartitionSets(ReadonlyGraphQLContextTestMatrix): +class TestPartitionSets(PersistentReadonlyGraphQLContextTestMatrix): def test_get_partition_sets_for_pipeline(self, graphql_context, snapshot): selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( @@ -178,7 +178,7 @@ } -class TestPartitionSetRuns(ExecutingGraphQLContextTestMatrix): +class TestPartitionSetRuns(OutOfProcessExecutingGraphQLContextTestMatrix): def test_get_partition_runs(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql_and_finish_runs( diff --git a/python_modules/dagster-test/dagster_test/toys/schedules.py b/python_modules/dagster-test/dagster_test/toys/schedules.py --- a/python_modules/dagster-test/dagster_test/toys/schedules.py +++ b/python_modules/dagster-test/dagster_test/toys/schedules.py @@ -29,7 +29,7 @@ ) selected = None - for partition in partition_set_def.get_partitions(): + for partition in partition_set_def.get_partitions(context.instance): runs = runs_by_partition[partition.name] selected = partition @@ -55,7 +55,9 @@ if run.status == PipelineRunStatus.STARTED: return False # would be nice to return a reason here - available_partitions = set([partition.name for partition in partition_set_def.get_partitions()]) + available_partitions = set( + [partition.name for partition in partition_set_def.get_partitions(context.instance)] + ) satisfied_partitions = set(runs_by_partition.keys()) is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions)) return is_remaining_partitions diff --git a/python_modules/dagster/dagster/api/snapshot_partition.py b/python_modules/dagster/dagster/api/snapshot_partition.py --- a/python_modules/dagster/dagster/api/snapshot_partition.py +++ b/python_modules/dagster/dagster/api/snapshot_partition.py @@ -7,12 +7,14 @@ ExternalPartitionTagsData, ) from dagster.core.host_representation.handle import RepositoryHandle +from dagster.core.instance import DagsterInstance from dagster.grpc.types import PartitionArgs, PartitionNamesArgs, PartitionSetExecutionParamArgs from .utils import execute_unary_api_cli_command -def sync_get_external_partition_names(repository_handle, partition_set_name): +def sync_get_external_partition_names(instance, repository_handle, partition_set_name): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") repository_origin = repository_handle.get_origin() @@ -22,31 +24,41 @@ repository_origin.executable_path, "partition_names", PartitionNamesArgs( - repository_origin=repository_origin, partition_set_name=partition_set_name + repository_origin=repository_origin, + partition_set_name=partition_set_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionNamesData, ExternalPartitionExecutionErrorData), ) -def sync_get_external_partition_names_grpc(api_client, repository_handle, partition_set_name): +def sync_get_external_partition_names_grpc( + api_client, instance, repository_handle, partition_set_name +): from dagster.grpc.client import DagsterGrpcClient check.inst_param(api_client, "api_client", DagsterGrpcClient) + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") repository_origin = repository_handle.get_origin() return check.inst( api_client.external_partition_names( partition_names_args=PartitionNamesArgs( - repository_origin=repository_origin, partition_set_name=partition_set_name, + repository_origin=repository_origin, + partition_set_name=partition_set_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionNamesData, ExternalPartitionExecutionErrorData), ) -def sync_get_external_partition_config(repository_handle, partition_set_name, partition_name): +def sync_get_external_partition_config( + instance, repository_handle, partition_set_name, partition_name +): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") @@ -60,6 +72,7 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionConfigData, ExternalPartitionExecutionErrorData), @@ -67,11 +80,12 @@ def sync_get_external_partition_config_grpc( - api_client, repository_handle, partition_set_name, partition_name + api_client, instance, repository_handle, partition_set_name, partition_name, ): from dagster.grpc.client import DagsterGrpcClient check.inst_param(api_client, "api_client", DagsterGrpcClient) + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") @@ -82,13 +96,17 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionConfigData, ExternalPartitionExecutionErrorData), ) -def sync_get_external_partition_tags(repository_handle, partition_set_name, partition_name): +def sync_get_external_partition_tags( + instance, repository_handle, partition_set_name, partition_name +): + check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") @@ -102,6 +120,7 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionTagsData, ExternalPartitionExecutionErrorData), @@ -109,7 +128,7 @@ def sync_get_external_partition_tags_grpc( - api_client, repository_handle, partition_set_name, partition_name + api_client, instance, repository_handle, partition_set_name, partition_name, ): from dagster.grpc.client import DagsterGrpcClient @@ -125,6 +144,7 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionTagsData, ExternalPartitionExecutionErrorData), @@ -132,8 +152,9 @@ def sync_get_external_partition_set_execution_param_data( - repository_handle, partition_set_name, partition_names + instance, repository_handle, partition_set_name, partition_names, ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) @@ -148,6 +169,7 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_names=partition_names, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionSetExecutionParamData, ExternalPartitionExecutionErrorData), @@ -155,11 +177,12 @@ def sync_get_external_partition_set_execution_param_data_grpc( - api_client, repository_handle, partition_set_name, partition_names + api_client, instance, repository_handle, partition_set_name, partition_names, ): from dagster.grpc.client import DagsterGrpcClient check.inst_param(api_client, "api_client", DagsterGrpcClient) + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) @@ -172,6 +195,7 @@ repository_origin=repository_origin, partition_set_name=partition_set_name, partition_names=partition_names, + instance_ref=instance.get_ref(), ), ), (ExternalPartitionSetExecutionParamData, ExternalPartitionExecutionErrorData), diff --git a/python_modules/dagster/dagster/cli/pipeline.py b/python_modules/dagster/dagster/cli/pipeline.py --- a/python_modules/dagster/dagster/cli/pipeline.py +++ b/python_modules/dagster/dagster/cli/pipeline.py @@ -841,7 +841,7 @@ # Resolve partitions to backfill partition_names_or_error = repo_location.get_external_partition_names( - repo_handle, partition_set_name, + instance, repo_handle, partition_set_name, ) if isinstance(partition_names_or_error, ExternalPartitionExecutionErrorData): @@ -872,6 +872,7 @@ backfill_id = make_new_backfill_id() backfill_tags = PipelineRun.tags_for_backfill_id(backfill_id) partition_execution_data = repo_location.get_external_partition_set_execution_param_data( + instance=instance, repository_handle=repo_handle, partition_set_name=partition_set_name, partition_names=partition_names, diff --git a/python_modules/dagster/dagster/core/definitions/decorators/schedule.py b/python_modules/dagster/dagster/core/definitions/decorators/schedule.py --- a/python_modules/dagster/dagster/core/definitions/decorators/schedule.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/schedule.py @@ -156,7 +156,9 @@ fmt = "%Y-%m" delta = relativedelta(months=1) - partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) + partition_fn = date_partition_range( + start_date, end=end_date, delta=delta, fmt=fmt, schedule_timezone=execution_timezone + ) def inner(fn): check.callable_param(fn, "fn") @@ -265,7 +267,9 @@ execution_offset = relativedelta(days=day_difference) - partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) + partition_fn = date_partition_range( + start_date, end=end_date, delta=delta, fmt=fmt, schedule_timezone=execution_timezone + ) def inner(fn): check.callable_param(fn, "fn") @@ -360,7 +364,9 @@ delta = datetime.timedelta(days=1) fmt = "%Y-%m-%d" - partition_fn = date_partition_range(start_date, end=end_date, delta=delta) + partition_fn = date_partition_range( + start_date, end=end_date, delta=delta, schedule_timezone=execution_timezone + ) def inner(fn): check.callable_param(fn, "fn") @@ -462,12 +468,14 @@ cron_schedule = "{minute} * * * *".format(minute=execution_time.minute) - fmt = "%Y-%m-%d-%H:%M" + fmt = "%Y-%m-%d-%H:%M%Z" delta = datetime.timedelta(hours=1) execution_offset = datetime.timedelta(minutes=(execution_time.minute - start_date.minute) % 60) - partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) + partition_fn = date_partition_range( + start_date, end=end_date, delta=delta, fmt=fmt, schedule_timezone=execution_timezone + ) def inner(fn): check.callable_param(fn, "fn") @@ -493,13 +501,7 @@ cron_schedule, should_execute=should_execute, environment_vars=environment_vars, - partition_selector=create_default_partition_selector_fn( - delta + execution_offset, - fmt, - # Express hourly partitions in UTC so that they don't change - # depending on what timezone the schedule runs in - partition_in_utc=True, - ), + partition_selector=create_default_partition_selector_fn(delta + execution_offset, fmt), execution_timezone=execution_timezone, ) diff --git a/python_modules/dagster/dagster/core/definitions/partition.py b/python_modules/dagster/dagster/core/definitions/partition.py --- a/python_modules/dagster/dagster/core/definitions/partition.py +++ b/python_modules/dagster/dagster/core/definitions/partition.py @@ -1,14 +1,15 @@ from collections import namedtuple from datetime import timedelta -import pendulum from dateutil.relativedelta import relativedelta from dagster import check from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleExecutionContext from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError +from dagster.core.instance import DagsterInstance from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter from dagster.core.storage.tags import check_tags +from dagster.seven import get_args from dagster.utils import merge_dicts from dagster.utils.partitions import DEFAULT_DATE_FORMAT @@ -35,9 +36,7 @@ ) -def create_default_partition_selector_fn( - delta=timedelta(days=1), fmt=DEFAULT_DATE_FORMAT, partition_in_utc=False -): +def create_default_partition_selector_fn(delta=timedelta(days=1), fmt=DEFAULT_DATE_FORMAT): check.inst_param(delta, "timedelta", (timedelta, relativedelta)) check.str_param(fmt, "fmt") @@ -51,19 +50,14 @@ # The tick at a given datetime corresponds to the time for the previous partition # e.g. midnight on 12/31 is actually the 12/30 partition - if partition_in_utc: - partition_time = ( - pendulum.instance(context.scheduled_execution_time).in_tz("UTC") - delta - ) - else: - partition_time = context.scheduled_execution_time - delta + partition_time = context.scheduled_execution_time - delta partition_name = partition_time.strftime(fmt) - if not partition_name in partition_set_def.get_partition_names(): + if not partition_name in partition_set_def.get_partition_names(context.instance): return None - return partition_set_def.get_partition(partition_name) + return partition_set_def.get_partition(partition_name, context.instance) return default_partition_selector @@ -72,7 +66,7 @@ check.inst_param(context, "context", ScheduleExecutionContext) check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) - partitions = partition_set_def.get_partitions() + partitions = partition_set_def.get_partitions(context.instance) if not partitions: return None return partitions[-1] @@ -83,7 +77,7 @@ partition_set_def = check.inst_param( partition_set_def, "partition_set_def", PartitionSetDefinition ) - partitions = partition_set_def.get_partitions() + partitions = partition_set_def.get_partitions(context.instance) if not partitions: return None selected = None @@ -102,7 +96,7 @@ partition_set_def, "partition_set_def", PartitionSetDefinition ) - partitions = partition_set_def.get_partitions() + partitions = partition_set_def.get_partitions(context.instance) if not partitions: return None @@ -156,13 +150,21 @@ "Expected | , received {type}".format(type=type(x)) ) + # partition fn may take in an instance or no parameters + if len(get_args(partition_fn)) > 0: + wrapper_partition_fn = lambda instance: [ + _wrap(x) for x in check.callable_param(partition_fn, "partition_fn")(instance) + ] + else: + wrapper_partition_fn = lambda instance: [ + _wrap(x) for x in check.callable_param(partition_fn, "partition_fn")() + ] + return super(PartitionSetDefinition, cls).__new__( cls, name=check_for_invalid_name_and_warn(name), pipeline_name=check.str_param(pipeline_name, "pipeline_name"), - partition_fn=lambda: [ - _wrap(x) for x in check.callable_param(partition_fn, "partition_fn")() - ], + partition_fn=wrapper_partition_fn, solid_selection=check.opt_nullable_list_param( solid_selection, "solid_selection", of_type=str ), @@ -186,18 +188,19 @@ return tags - def get_partitions(self): - return self.partition_fn() + def get_partitions(self, instance): + check.inst_param(instance, "instance", DagsterInstance) + return self.partition_fn(instance) - def get_partition(self, name): - for partition in self.get_partitions(): + def get_partition(self, name, instance): + for partition in self.get_partitions(instance): if partition.name == name: return partition check.failed("Partition name {} not found!".format(name)) - def get_partition_names(self): - return [part.name for part in self.get_partitions()] + def get_partition_names(self, instance): + return [part.name for part in self.get_partitions(instance)] def create_schedule_definition( self, @@ -237,7 +240,9 @@ check.inst_param(context, "context", ScheduleExecutionContext) selected_partition = partition_selector(context, self) - if not selected_partition or not selected_partition.name in self.get_partition_names(): + if not selected_partition or not selected_partition.name in self.get_partition_names( + context.instance + ): return False elif not should_execute: return True @@ -247,7 +252,9 @@ def _run_config_fn_wrapper(context): check.inst_param(context, "context", ScheduleExecutionContext) selected_partition = partition_selector(context, self) - if not selected_partition or not selected_partition.name in self.get_partition_names(): + if not selected_partition or not selected_partition.name in self.get_partition_names( + context.instance + ): raise DagsterInvariantViolationError( "The partition selection function `{selector}` did not return " "a partition from PartitionSet {partition_set}".format( diff --git a/python_modules/dagster/dagster/core/host_representation/repository_location.py b/python_modules/dagster/dagster/core/host_representation/repository_location.py --- a/python_modules/dagster/dagster/core/host_representation/repository_location.py +++ b/python_modules/dagster/dagster/core/host_representation/repository_location.py @@ -142,20 +142,24 @@ pass @abstractmethod - def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_config( + self, instance, repository_handle, partition_set_name, partition_name + ): pass @abstractmethod - def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_tags( + self, instance, repository_handle, partition_set_name, partition_name + ): pass @abstractmethod - def get_external_partition_names(self, repository_handle, partition_set_name): + def get_external_partition_names(self, instance, repository_handle, partition_set_name): pass @abstractmethod def get_external_partition_set_execution_param_data( - self, repository_handle, partition_set_name, partition_names + self, instance, repository_handle, partition_set_name, partition_names ): pass @@ -328,7 +332,10 @@ return ExternalPipelineExecutionResult(event_list=execution_result.event_list) - def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_config( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") @@ -337,10 +344,14 @@ repository_origin=repository_handle.get_origin(), partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ) return get_partition_config(args) - def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_tags( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") @@ -349,15 +360,19 @@ repository_origin=repository_handle.get_origin(), partition_set_name=partition_set_name, partition_name=partition_name, + instance_ref=instance.get_ref(), ) return get_partition_tags(args) - def get_external_partition_names(self, repository_handle, partition_set_name): + def get_external_partition_names(self, instance, repository_handle, partition_set_name): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") args = PartitionNamesArgs( - repository_origin=repository_handle.get_origin(), partition_set_name=partition_set_name + repository_origin=repository_handle.get_origin(), + partition_set_name=partition_set_name, + instance_ref=instance.get_ref(), ) return get_partition_names(args) @@ -408,8 +423,9 @@ return get_external_executable_params(recon_repo, args) def get_external_partition_set_execution_param_data( - self, repository_handle, partition_set_name, partition_names + self, instance, repository_handle, partition_set_name, partition_names ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) @@ -418,6 +434,7 @@ repository_origin=repository_handle.get_origin(), partition_set_name=partition_set_name, partition_names=partition_names, + instance_ref=instance.get_ref(), ) return get_partition_set_execution_param_data(args) @@ -525,30 +542,37 @@ self._handle.client, pipeline_handle.get_origin(), selector.solid_selection ) - def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_config( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_config_grpc( - self._handle.client, repository_handle, partition_set_name, partition_name + self._handle.client, instance, repository_handle, partition_set_name, partition_name ) - def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_tags( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_tags_grpc( - self._handle.client, repository_handle, partition_set_name, partition_name + self._handle.client, instance, repository_handle, partition_set_name, partition_name ) - def get_external_partition_names(self, repository_handle, partition_set_name): + def get_external_partition_names(self, instance, repository_handle, partition_set_name): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") return sync_get_external_partition_names_grpc( - self._handle.client, repository_handle, partition_set_name + self._handle.client, instance, repository_handle, partition_set_name ) def get_external_schedule_execution_data( @@ -587,14 +611,15 @@ ) def get_external_partition_set_execution_param_data( - self, repository_handle, partition_set_name, partition_names + self, instance, repository_handle, partition_set_name, partition_names ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) return sync_get_external_partition_set_execution_param_data_grpc( - self._handle.client, repository_handle, partition_set_name, partition_names + self._handle.client, instance, repository_handle, partition_set_name, partition_names, ) @@ -705,29 +730,36 @@ pipeline_handle.get_origin(), selector.solid_selection ) - def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_config( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_config( - repository_handle, partition_set_name, partition_name + instance, repository_handle, partition_set_name, partition_name ) - def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): + def get_external_partition_tags( + self, instance, repository_handle, partition_set_name, partition_name + ): + check.inst_param(instance, "instacne", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.str_param(partition_name, "partition_name") return sync_get_external_partition_tags( - repository_handle, partition_set_name, partition_name + instance, repository_handle, partition_set_name, partition_name ) - def get_external_partition_names(self, repository_handle, partition_set_name): + def get_external_partition_names(self, instance, repository_handle, partition_set_name): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") - return sync_get_external_partition_names(repository_handle, partition_set_name) + return sync_get_external_partition_names(instance, repository_handle, partition_set_name) def get_external_schedule_execution_data( self, @@ -762,12 +794,13 @@ return sync_get_external_executable_params(instance, repository_handle, name) def get_external_partition_set_execution_param_data( - self, repository_handle, partition_set_name, partition_names + self, instance, repository_handle, partition_set_name, partition_names ): + check.inst_param(instance, "instance", DagsterInstance) check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(partition_set_name, "partition_set_name") check.list_param(partition_names, "partition_names", of_type=str) return sync_get_external_partition_set_execution_param_data( - repository_handle, partition_set_name, partition_names + instance, repository_handle, partition_set_name, partition_names, ) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -1076,6 +1076,10 @@ return self._scheduler.running_schedule_count(self, schedule_origin_id) return 0 + @property + def default_schedule_timezone(self): + return self._scheduler.get_default_timezone() if self._scheduler else None + def scheduler_debug_info(self): from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus diff --git a/python_modules/dagster/dagster/core/scheduler/scheduler.py b/python_modules/dagster/dagster/core/scheduler/scheduler.py --- a/python_modules/dagster/dagster/core/scheduler/scheduler.py +++ b/python_modules/dagster/dagster/core/scheduler/scheduler.py @@ -429,6 +429,9 @@ schedule_origin_id (string): The id of the schedule target to retrieve the log path for """ + def get_default_timezone(self): + return None + class DagsterCommandLineScheduler(Scheduler, ConfigurableClass): """Scheduler implementation that launches runs from the `dagster scheduler run` @@ -437,7 +440,7 @@ def __init__(self, inst_data=None, default_timezone_str="UTC"): self._inst_data = inst_data - self.default_timezone_str = default_timezone_str + self._default_timezone_str = default_timezone_str @property def inst_data(self): @@ -492,6 +495,9 @@ logs_directory = self._get_or_create_logs_directory(instance, schedule_origin_id) return os.path.join(logs_directory, "scheduler.log") + def get_default_timezone(self): + return self._default_timezone_str + class ScheduleTickStatsSnapshot( namedtuple( diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -320,7 +320,9 @@ recon_repo = recon_repository_from_origin(args.repository_origin) definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(args.partition_set_name) - partition = partition_set_def.get_partition(args.partition_name) + + with DagsterInstance.from_ref(args.instance_ref) as instance: + partition = partition_set_def.get_partition(args.partition_name, instance) try: with user_code_error_boundary( PartitionExecutionError, @@ -343,14 +345,17 @@ definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(args.partition_set_name) try: - with user_code_error_boundary( - PartitionExecutionError, - lambda: "Error occurred during the execution of the partition generation function for " - "partition set {partition_set_name}".format(partition_set_name=partition_set_def.name), - ): - return ExternalPartitionNamesData( - partition_names=partition_set_def.get_partition_names() - ) + with DagsterInstance.from_ref(args.instance_ref) as instance: + with user_code_error_boundary( + PartitionExecutionError, + lambda: "Error occurred during the execution of the partition generation function for " + "partition set {partition_set_name}".format( + partition_set_name=partition_set_def.name + ), + ): + return ExternalPartitionNamesData( + partition_names=partition_set_def.get_partition_names(instance) + ) except PartitionExecutionError: return ExternalPartitionExecutionErrorData( serializable_error_info_from_exc_info(sys.exc_info()) @@ -362,7 +367,9 @@ recon_repo = recon_repository_from_origin(args.repository_origin) definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(args.partition_set_name) - partition = partition_set_def.get_partition(args.partition_name) + + with DagsterInstance.from_ref(args.instance_ref) as instance: + partition = partition_set_def.get_partition(args.partition_name, instance) try: with user_code_error_boundary( PartitionExecutionError, @@ -409,15 +416,16 @@ repo_definition = recon_repo.get_definition() partition_set_def = repo_definition.get_partition_set_def(args.partition_set_name) try: - with user_code_error_boundary( - PartitionExecutionError, - lambda: "Error occurred during the partition generation for partition set " - "{partition_set_name}".format(partition_set_name=partition_set_def.name), - ): - all_partitions = partition_set_def.get_partitions() - partitions = [ - partition for partition in all_partitions if partition.name in args.partition_names - ] + with DagsterInstance.from_ref(args.instance_ref) as instance: + with user_code_error_boundary( + PartitionExecutionError, + lambda: "Error occurred during the partition generation for partition set " + "{partition_set_name}".format(partition_set_name=partition_set_def.name), + ): + all_partitions = partition_set_def.get_partitions(instance) + partitions = [ + partition for partition in all_partitions if partition.name in args.partition_names + ] partition_data = [] for partition in partitions: diff --git a/python_modules/dagster/dagster/grpc/server.py b/python_modules/dagster/dagster/grpc/server.py --- a/python_modules/dagster/dagster/grpc/server.py +++ b/python_modules/dagster/dagster/grpc/server.py @@ -341,20 +341,21 @@ partition_names_args.partition_set_name ) try: - with user_code_error_boundary( - PartitionExecutionError, - lambda: "Error occurred during the execution of the partition generation function for " - "partition set {partition_set_name}".format( - partition_set_name=partition_set_def.name - ), - ): - return api_pb2.ExternalPartitionNamesReply( - serialized_external_partition_names_or_external_partition_execution_error=serialize_dagster_namedtuple( - ExternalPartitionNamesData( - partition_names=partition_set_def.get_partition_names() + with DagsterInstance.from_ref(partition_names_args.instance_ref) as instance: + with user_code_error_boundary( + PartitionExecutionError, + lambda: "Error occurred during the execution of the partition generation function for " + "partition set {partition_set_name}".format( + partition_set_name=partition_set_def.name + ), + ): + return api_pb2.ExternalPartitionNamesReply( + serialized_external_partition_names_or_external_partition_execution_error=serialize_dagster_namedtuple( + ExternalPartitionNamesData( + partition_names=partition_set_def.get_partition_names(instance) + ) ) ) - ) except PartitionExecutionError: return api_pb2.ExternalPartitionNamesReply( serialized_external_partition_names_or_external_partition_execution_error=serialize_dagster_namedtuple( @@ -389,7 +390,10 @@ lambda: "Error occurred during the partition generation for partition set " "{partition_set_name}".format(partition_set_name=partition_set_def.name), ): - all_partitions = partition_set_def.get_partitions() + with DagsterInstance.from_ref( + partition_set_execution_param_args.instance_ref + ) as instance: + all_partitions = partition_set_def.get_partitions(instance) partitions = [ partition for partition in all_partitions @@ -441,7 +445,8 @@ recon_repo = self._recon_repository_from_origin(partition_args.repository_origin) definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_args.partition_set_name) - partition = partition_set_def.get_partition(partition_args.partition_name) + with DagsterInstance.from_ref(partition_args.instance_ref) as instance: + partition = partition_set_def.get_partition(partition_args.partition_name, instance) try: with user_code_error_boundary( PartitionExecutionError, @@ -473,7 +478,8 @@ recon_repo = self._recon_repository_from_origin(partition_args.repository_origin) definition = recon_repo.get_definition() partition_set_def = definition.get_partition_set_def(partition_args.partition_set_name) - partition = partition_set_def.get_partition(partition_args.partition_name) + with DagsterInstance.from_ref(partition_args.instance_ref) as instance: + partition = partition_set_def.get_partition(partition_args.partition_name, instance) try: with user_code_error_boundary( PartitionExecutionError, diff --git a/python_modules/dagster/dagster/grpc/types.py b/python_modules/dagster/dagster/grpc/types.py --- a/python_modules/dagster/dagster/grpc/types.py +++ b/python_modules/dagster/dagster/grpc/types.py @@ -139,9 +139,9 @@ @whitelist_for_serdes class PartitionArgs( - namedtuple("_PartitionArgs", "repository_origin partition_set_name partition_name") + namedtuple("_PartitionArgs", "repository_origin partition_set_name partition_name instance_ref") ): - def __new__(cls, repository_origin, partition_set_name, partition_name): + def __new__(cls, repository_origin, partition_set_name, partition_name, instance_ref): return super(PartitionArgs, cls).__new__( cls, repository_origin=check.inst_param( @@ -149,28 +149,33 @@ ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), partition_name=check.str_param(partition_name, "partition_name"), + instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), ) @whitelist_for_serdes -class PartitionNamesArgs(namedtuple("_PartitionNamesArgs", "repository_origin partition_set_name")): - def __new__(cls, repository_origin, partition_set_name): +class PartitionNamesArgs( + namedtuple("_PartitionNamesArgs", "repository_origin partition_set_name instance_ref") +): + def __new__(cls, repository_origin, partition_set_name, instance_ref): return super(PartitionNamesArgs, cls).__new__( cls, repository_origin=check.inst_param( repository_origin, "repository_origin", RepositoryOrigin ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), + instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), ) @whitelist_for_serdes class PartitionSetExecutionParamArgs( namedtuple( - "_PartitionSetExecutionParamArgs", "repository_origin partition_set_name partition_names", + "_PartitionSetExecutionParamArgs", + "repository_origin partition_set_name partition_names instance_ref", ) ): - def __new__(cls, repository_origin, partition_set_name, partition_names): + def __new__(cls, repository_origin, partition_set_name, partition_names, instance_ref): return super(PartitionSetExecutionParamArgs, cls).__new__( cls, repository_origin=check.inst_param( @@ -178,6 +183,7 @@ ), partition_set_name=check.str_param(partition_set_name, "partition_set_name"), partition_names=check.list_param(partition_names, "partition_names", of_type=str), + instance_ref=check.inst_param(instance_ref, "instance_ref", InstanceRef), ) diff --git a/python_modules/dagster/dagster/scheduler/scheduler.py b/python_modules/dagster/dagster/scheduler/scheduler.py --- a/python_modules/dagster/dagster/scheduler/scheduler.py +++ b/python_modules/dagster/dagster/scheduler/scheduler.py @@ -177,8 +177,6 @@ check.inst_param(end_datetime_utc, "end_datetime_utc", datetime.datetime) check.inst_param(repo_location, "repo_location", RepositoryLocation) - scheduler = instance.scheduler - latest_tick = instance.get_latest_tick(schedule_state.schedule_origin_id) if not latest_tick: @@ -202,7 +200,7 @@ timezone_str = ( external_schedule.execution_timezone if external_schedule.execution_timezone - else scheduler.default_timezone_str + else instance.default_schedule_timezone ) end_datetime = end_datetime_utc.in_tz(timezone_str) diff --git a/python_modules/dagster/dagster/utils/partitions.py b/python_modules/dagster/dagster/utils/partitions.py --- a/python_modules/dagster/dagster/utils/partitions.py +++ b/python_modules/dagster/dagster/utils/partitions.py @@ -1,15 +1,22 @@ import datetime +import pendulum from dateutil.relativedelta import relativedelta from dagster import check from dagster.core.errors import DagsterInvariantViolationError +from dagster.seven import get_current_datetime_in_utc DEFAULT_DATE_FORMAT = "%Y-%m-%d" def date_partition_range( - start, end=None, delta=datetime.timedelta(days=1), fmt=None, inclusive=False + start, + end=None, + delta=datetime.timedelta(days=1), + fmt=None, + inclusive=False, + schedule_timezone=None, ): """ Utility function that returns a partition generating function to be used in creating a `PartitionSet` definition. @@ -45,15 +52,36 @@ ) ) - def get_date_range_partitions(): - current = start + def get_date_range_partitions(instance): + date_names = [] + if instance.default_schedule_timezone: + timezone = ( + schedule_timezone if schedule_timezone else instance.default_schedule_timezone + ) - _end = end or datetime.datetime.now() + current_utc = pendulum.instance(start, tz=timezone).in_tz("UTC") + _end_utc = ( + pendulum.instance(end, tz=timezone).in_tz("UTC") + if end + else get_current_datetime_in_utc() + ) - date_names = [] - while current <= _end: - date_names.append(Partition(value=current, name=current.strftime(fmt))) - current = current + delta + while current_utc <= _end_utc: + current_in_timezone = current_utc.in_tz(timezone) + + date_names.append( + Partition(value=current_in_timezone, name=current_in_timezone.strftime(fmt)) + ) + current_utc = current_utc + delta + + else: + current = start + + _end = end or datetime.datetime.now() + + while current <= _end: + date_names.append(Partition(value=current, name=current.strftime(fmt))) + current = current + delta # We don't include the last element here by default since we only want # fully completed intervals, and the _end time is in the middle of the interval diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py @@ -17,47 +17,59 @@ ExternalPartitionSetExecutionParamData, ExternalPartitionTagsData, ) +from dagster.core.test_utils import instance_for_test from .utils import get_bar_grpc_repo_handle, get_bar_repo_handle def test_external_partition_names(): repository_handle = get_bar_repo_handle() - data = sync_get_external_partition_names(repository_handle, "baz_partitions") + with instance_for_test() as instance: + data = sync_get_external_partition_names(instance, repository_handle, "baz_partitions") assert isinstance(data, ExternalPartitionNamesData) assert data.partition_names == list(string.ascii_lowercase) def test_external_partition_names_error(): repository_handle = get_bar_repo_handle() - error = sync_get_external_partition_names(repository_handle, "error_partitions") + with instance_for_test() as instance: + error = sync_get_external_partition_names(instance, repository_handle, "error_partitions") assert isinstance(error, ExternalPartitionExecutionErrorData) assert "womp womp" in error.error.to_string() def test_external_partition_names_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - data = sync_get_external_partition_names_grpc( - repository_handle.repository_location_handle.client, repository_handle, "baz_partitions" - ) + with instance_for_test() as instance: + data = sync_get_external_partition_names_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "baz_partitions", + ) assert isinstance(data, ExternalPartitionNamesData) assert data.partition_names == list(string.ascii_lowercase) def test_external_partition_names_error_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - error = sync_get_external_partition_names_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "error_partitions", - ) + with instance_for_test() as instance: + error = sync_get_external_partition_names_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "error_partitions", + ) assert isinstance(error, ExternalPartitionExecutionErrorData) assert "womp womp" in error.error.to_string() def test_external_partitions_config(): repository_handle = get_bar_repo_handle() - data = sync_get_external_partition_config(repository_handle, "baz_partitions", "c") + with instance_for_test() as instance: + data = sync_get_external_partition_config( + instance, repository_handle, "baz_partitions", "c" + ) assert isinstance(data, ExternalPartitionConfigData) assert data.run_config assert data.run_config["solids"]["do_input"]["inputs"]["x"]["value"] == "c" @@ -65,18 +77,23 @@ def test_external_partitions_config_error(): repository_handle = get_bar_repo_handle() - error = sync_get_external_partition_config(repository_handle, "error_partition_config", "c") + with instance_for_test() as instance: + error = sync_get_external_partition_config( + instance, repository_handle, "error_partition_config", "c" + ) assert isinstance(error, ExternalPartitionExecutionErrorData) def test_external_partitions_config_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - data = sync_get_external_partition_config_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "baz_partitions", - "c", - ) + with instance_for_test() as instance: + data = sync_get_external_partition_config_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "baz_partitions", + "c", + ) assert isinstance(data, ExternalPartitionConfigData) assert data.run_config assert data.run_config["solids"]["do_input"]["inputs"]["x"]["value"] == "c" @@ -84,18 +101,21 @@ def test_external_partitions_config_error_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - error = sync_get_external_partition_config_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "error_partition_config", - "c", - ) + with instance_for_test() as instance: + error = sync_get_external_partition_config_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "error_partition_config", + "c", + ) assert isinstance(error, ExternalPartitionExecutionErrorData) def test_external_partitions_tags(): repository_handle = get_bar_repo_handle() - data = sync_get_external_partition_tags(repository_handle, "baz_partitions", "c") + with instance_for_test() as instance: + data = sync_get_external_partition_tags(instance, repository_handle, "baz_partitions", "c") assert isinstance(data, ExternalPartitionTagsData) assert data.tags assert data.tags["foo"] == "bar" @@ -103,18 +123,23 @@ def test_external_partitions_tags_error(): repository_handle = get_bar_repo_handle() - error = sync_get_external_partition_tags(repository_handle, "error_partition_tags", "c") + with instance_for_test() as instance: + error = sync_get_external_partition_tags( + instance, repository_handle, "error_partition_tags", "c" + ) assert isinstance(error, ExternalPartitionExecutionErrorData) def test_external_partitions_tags_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - data = sync_get_external_partition_tags_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "baz_partitions", - "c", - ) + with instance_for_test() as instance: + data = sync_get_external_partition_tags_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "baz_partitions", + "c", + ) assert isinstance(data, ExternalPartitionTagsData) assert data.tags assert data.tags["foo"] == "bar" @@ -122,40 +147,46 @@ def test_external_partitions_tags_error_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - error = sync_get_external_partition_tags_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "error_partition_tags", - "c", - ) + with instance_for_test() as instance: + error = sync_get_external_partition_tags_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "error_partition_tags", + "c", + ) assert isinstance(error, ExternalPartitionExecutionErrorData) def test_external_partition_set_execution_params(): repository_handle = get_bar_repo_handle() - data = sync_get_external_partition_set_execution_param_data( - repository_handle, "baz_partitions", ["a", "b", "c"] - ) + with instance_for_test() as instance: + data = sync_get_external_partition_set_execution_param_data( + instance, repository_handle, "baz_partitions", ["a", "b", "c"] + ) assert isinstance(data, ExternalPartitionSetExecutionParamData) assert len(data.partition_data) == 3 def test_external_partition_set_execution_params_grpc(): with get_bar_grpc_repo_handle() as repository_handle: - data = sync_get_external_partition_set_execution_param_data_grpc( - repository_handle.repository_location_handle.client, - repository_handle, - "baz_partitions", - ["a", "b", "c"], - ) + with instance_for_test() as instance: + data = sync_get_external_partition_set_execution_param_data_grpc( + repository_handle.repository_location_handle.client, + instance, + repository_handle, + "baz_partitions", + ["a", "b", "c"], + ) assert isinstance(data, ExternalPartitionSetExecutionParamData) assert len(data.partition_data) == 3 def test_external_partition_set_execution_params_error(): repository_handle = get_bar_repo_handle() - error = sync_get_external_partition_set_execution_param_data( - repository_handle, "error_partitions", ["a", "b", "c"] - ) + with instance_for_test() as instance: + error = sync_get_external_partition_set_execution_param_data( + instance, repository_handle, "error_partitions", ["a", "b", "c"] + ) assert isinstance(error, ExternalPartitionExecutionErrorData) assert "womp womp" in error.error.to_string() diff --git a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py --- a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py @@ -457,8 +457,10 @@ return {} -def _check_partitions(partition_schedule_def, expected_num_partitions, expected_relative_delta): - partitions = partition_schedule_def.get_partition_set().partition_fn() +def _check_partitions( + instance, partition_schedule_def, expected_num_partitions, expected_relative_delta +): + partitions = partition_schedule_def.get_partition_set().partition_fn(instance) assert len(partitions) == expected_num_partitions for index, partition in enumerate(partitions): @@ -479,7 +481,7 @@ def hourly_foo_schedule(hourly_time): return {"hourly_time": hourly_time.isoformat()} - _check_partitions(hourly_foo_schedule, 24 * (31 + 26), relativedelta(hours=1)) + _check_partitions(instance, hourly_foo_schedule, 24 * (31 + 26), relativedelta(hours=1)) assert hourly_foo_schedule.get_run_config(context_without_time) == { "hourly_time": datetime(year=2019, month=2, day=26, hour=23, minute=1).isoformat() @@ -527,7 +529,7 @@ def daily_foo_schedule(daily_time): return {"daily_time": daily_time.isoformat()} - _check_partitions(daily_foo_schedule, (31 + 26), relativedelta(days=1)) + _check_partitions(instance, daily_foo_schedule, (31 + 26), relativedelta(days=1)) valid_daily_time = datetime(year=2019, month=1, day=27, hour=9, minute=30) context_with_valid_time = ScheduleExecutionContext(instance, valid_daily_time) @@ -571,7 +573,7 @@ } assert weekly_foo_schedule.should_execute(context_without_time) - _check_partitions(weekly_foo_schedule, 8, relativedelta(weeks=1)) + _check_partitions(instance, weekly_foo_schedule, 8, relativedelta(weeks=1)) @freeze_time("2019-02-27 00:01:01") @@ -601,7 +603,7 @@ } assert monthly_foo_schedule.should_execute(context_without_time) - _check_partitions(monthly_foo_schedule, 1, relativedelta(months=1)) + _check_partitions(instance, monthly_foo_schedule, 1, relativedelta(months=1)) def test_schedule_decorators_bad(): diff --git a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_utils.py b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_utils.py --- a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_utils.py +++ b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_utils.py @@ -4,6 +4,7 @@ from dateutil.relativedelta import relativedelta from dagster import DagsterInvariantViolationError, Partition +from dagster.core.test_utils import instance_for_test from dagster.utils.partitions import date_partition_range @@ -76,15 +77,17 @@ ], ) def test_date_partition_range_daily(start, end, delta, inclusive, expected_partitions): - partition_generator = date_partition_range(start, end, delta, inclusive=inclusive) - generated_partitions = partition_generator() - assert all( - isinstance(generated_partition, Partition) for generated_partition in generated_partitions - ) - assert len(generated_partitions) == len(expected_partitions) - assert all( - expected_partition_name == generated_partition_name - for expected_partition_name, generated_partition_name in zip( - expected_partitions, [partition.name for partition in generated_partitions] + with instance_for_test() as instance: + partition_generator = date_partition_range(start, end, delta, inclusive=inclusive) + generated_partitions = partition_generator(instance) + assert all( + isinstance(generated_partition, Partition) + for generated_partition in generated_partitions + ) + assert len(generated_partitions) == len(expected_partitions) + assert all( + expected_partition_name == generated_partition_name + for expected_partition_name, generated_partition_name in zip( + expected_partitions, [partition.name for partition in generated_partitions] + ) ) - ) diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_timezones.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_timezones.py --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_timezones.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_timezones.py @@ -215,7 +215,7 @@ # Verify that a schedule that runs in US/Central late enough in the day that it executes on # a different day in UTC still runs and creates its partition names based on the US/Central time @pytest.mark.parametrize("external_repo_context", [default_repo, grpc_repo]) -def test_different_days_in_different_timezones(external_repo_context): +def test_different_days_in_different_timezones_late(external_repo_context): with instance_with_schedules(external_repo_context, timezone="US/Central") as ( instance, external_repo, @@ -275,6 +275,69 @@ assert ticks[0].status == ScheduleTickStatus.SUCCESS +# Verify that a schedule that runs in US/Central early enough that it isn't the next +# day in UTC yet still has a valid partition +@pytest.mark.parametrize("external_repo_context", [default_repo, grpc_repo]) +def test_different_days_in_different_timezones_early(external_repo_context): + with instance_with_schedules(external_repo_context, timezone="Europe/Paris") as ( + instance, + external_repo, + ): + freeze_datetime = pendulum.create(2019, 2, 27, 23, 59, 59, tz="Europe/Paris").in_tz( + "US/Pacific" + ) + with pendulum.test(freeze_datetime): + # Runs every day at 11PM (CST) + external_schedule = external_repo.get_external_schedule("simple_schedule") + schedule_origin = external_schedule.get_origin() + instance.start_schedule_and_update_storage_state(external_schedule) + + assert instance.get_runs_count() == 0 + ticks = instance.get_schedule_ticks(schedule_origin.get_id()) + assert len(ticks) == 0 + + launch_scheduled_runs( + instance, get_default_scheduler_logger(), pendulum.now("UTC"), + ) + assert instance.get_runs_count() == 0 + ticks = instance.get_schedule_ticks(schedule_origin.get_id()) + assert len(ticks) == 0 + + freeze_datetime = freeze_datetime.add(seconds=2) + with pendulum.test(freeze_datetime): + launch_scheduled_runs( + instance, get_default_scheduler_logger(), pendulum.now("UTC"), + ) + + assert instance.get_runs_count() == 1 + ticks = instance.get_schedule_ticks(schedule_origin.get_id()) + assert len(ticks) == 1 + + expected_datetime = pendulum.create( + year=2019, month=2, day=28, tz="Europe/Paris" + ).in_tz("UTC") + + validate_tick( + ticks[0], + external_schedule, + expected_datetime, + ScheduleTickStatus.SUCCESS, + instance.get_runs()[0].run_id, + ) + + wait_for_all_runs_to_start(instance) + validate_run_started(instance.get_runs()[0], expected_datetime, "2019-02-27") + + # Verify idempotence + launch_scheduled_runs( + instance, get_default_scheduler_logger(), pendulum.now("UTC"), + ) + assert instance.get_runs_count() == 1 + ticks = instance.get_schedule_ticks(schedule_origin.get_id()) + assert len(ticks) == 1 + assert ticks[0].status == ScheduleTickStatus.SUCCESS + + @pytest.mark.parametrize( "external_repo_context", [default_repo, grpc_repo], ) @@ -329,7 +392,10 @@ validate_run_started( instance.get_runs()[i], expected_datetimes_utc[i], - expected_datetimes_utc[i].subtract(hours=1).strftime("%Y-%m-%d-%H:%M"), + expected_datetimes_utc[i] + .subtract(hours=1) + .in_tz("US/Central") + .strftime("%Y-%m-%d-%H:%M%Z"), ) # Verify idempotence @@ -410,7 +476,10 @@ validate_run_started( instance.get_runs()[i], expected_datetimes_utc[i], - expected_datetimes_utc[i].subtract(hours=1).strftime("%Y-%m-%d-%H:%M"), + expected_datetimes_utc[i] + .subtract(hours=1) + .in_tz("US/Central") + .strftime("%Y-%m-%d-%H:%M%Z"), ) # Verify idempotence