diff --git a/python_modules/dagit/dagit_tests/pipeline.py b/python_modules/dagit/dagit_tests/pipeline.py --- a/python_modules/dagit/dagit_tests/pipeline.py +++ b/python_modules/dagit/dagit_tests/pipeline.py @@ -1,5 +1,3 @@ -import datetime - from dagster import ( InputDefinition, Int, @@ -9,6 +7,7 @@ pipeline, repository, ) +from dagster.core.test_utils import today_at_midnight @lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) @@ -26,7 +25,9 @@ return mult_two(num=add_one()) -@daily_schedule(pipeline_name="math", start_date=datetime.datetime.now()) +@daily_schedule( + pipeline_name="math", start_date=today_at_midnight(), +) def my_schedule(_): return {"solids": {"mult_two": {"inputs": {"num": {"value": 2}}}}} diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py @@ -59,6 +59,7 @@ from dagster.core.host_representation import InProcessRepositoryLocation, RepositoryLocationHandle from dagster.core.log_manager import coerce_valid_log_level from dagster.core.storage.tags import RESUME_RETRY_TAG +from dagster.core.test_utils import today_at_midnight from dagster.utils import file_relative_path, segfault @@ -870,7 +871,7 @@ @daily_schedule( pipeline_name="no_config_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=2)).time(), ) def partition_based_decorator(_date): @@ -878,7 +879,7 @@ @daily_schedule( pipeline_name="multi_mode_with_loggers", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=2)).time(), mode="foo_mode", ) @@ -887,7 +888,7 @@ @hourly_schedule( pipeline_name="no_config_chain_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=2)).time(), solid_selection=["return_foo"], ) @@ -896,7 +897,7 @@ @daily_schedule( pipeline_name="no_config_chain_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=2), + start_date=today_at_midnight() - datetime.timedelta(days=2), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=3)).time(), solid_selection=["return_foo"], ) @@ -905,7 +906,7 @@ @monthly_schedule( pipeline_name="no_config_chain_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=100), + start_date=(today_at_midnight() - datetime.timedelta(days=100)).replace(day=1), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=4)).time(), solid_selection=["return_foo"], ) @@ -914,7 +915,7 @@ @weekly_schedule( pipeline_name="no_config_chain_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=50), + start_date=today_at_midnight() - datetime.timedelta(days=50), execution_time=(datetime.datetime.now() + datetime.timedelta(hours=5)).time(), solid_selection=["return_foo"], ) @@ -924,7 +925,7 @@ # Schedules for testing the user error boundary @daily_schedule( pipeline_name="no_config_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), should_execute=lambda _: asdf, # pylint: disable=undefined-variable ) def should_execute_error_schedule(_date): @@ -932,7 +933,7 @@ @daily_schedule( pipeline_name="no_config_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), tags_fn_for_date=lambda _: asdf, # pylint: disable=undefined-variable ) def tags_error_schedule(_date): @@ -940,7 +941,7 @@ @daily_schedule( pipeline_name="no_config_pipeline", - start_date=datetime.datetime.now() - datetime.timedelta(days=1), + start_date=today_at_midnight() - datetime.timedelta(days=1), ) def run_config_error_schedule(_date): return asdf # pylint: disable=undefined-variable diff --git a/python_modules/dagster/dagster/core/definitions/decorators/schedule.py b/python_modules/dagster/dagster/core/definitions/decorators/schedule.py --- a/python_modules/dagster/dagster/core/definitions/decorators/schedule.py +++ b/python_modules/dagster/dagster/core/definitions/decorators/schedule.py @@ -149,6 +149,18 @@ check.inst_param(execution_time, "execution_time", datetime.time) check.opt_str_param(execution_timezone, "execution_timezone") + if ( + start_date.day != 1 + or start_date.hour != 0 + or start_date.minute != 0 + or start_date.second != 0 + ): + raise DagsterInvalidDefinitionError( + "`start_date` must be at the beginning of a month for a monthly schedule. " + "Use `execution_day_of_month` and `execution_time` to execute the schedule later " + "in the month." + ) + if execution_day_of_month <= 0 or execution_day_of_month > 31: raise DagsterInvalidDefinitionError( "`execution_day_of_month={}` is not valid for monthly schedule. Execution day must be " @@ -257,6 +269,12 @@ check.inst_param(execution_time, "execution_time", datetime.time) check.opt_str_param(execution_timezone, "execution_timezone") + if start_date.hour != 0 or start_date.minute != 0 or start_date.second != 0: + raise DagsterInvalidDefinitionError( + "`start_date` must be at the beginning of a day for a weekly schedule. " + "Use `execution_time` to execute the schedule later in the day." + ) + if execution_day_of_week < 0 or execution_day_of_week >= 7: raise DagsterInvalidDefinitionError( "`execution_day_of_week={}` is not valid for weekly schedule. Execution day must be " @@ -364,6 +382,12 @@ check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) check.opt_str_param(execution_timezone, "execution_timezone") + if start_date.hour != 0 or start_date.minute != 0 or start_date.second != 0: + raise DagsterInvalidDefinitionError( + "`start_date` must be at the beginning of a day for a daily schedule. " + "Use `execution_time` to execute the schedule later in the day." + ) + cron_schedule = "{minute} {hour} * * *".format( minute=execution_time.minute, hour=execution_time.hour ) @@ -464,11 +488,17 @@ check.inst_param(execution_time, "execution_time", datetime.time) check.opt_str_param(execution_timezone, "execution_timezone") + if start_date.minute != 0 or start_date.second != 0: + raise DagsterInvalidDefinitionError( + "`start_date` must be at the beginning of an hour for an hourly schedule. " + "Use `execution_time` to execute the schedule later in the hour." + ) + if execution_time.hour != 0: warnings.warn( "Hourly schedule {schedule_name} created with:\n" "\tschedule_time=datetime.time(hour={hour}, minute={minute}, ...)." - "Since this is a hourly schedule, the hour parameter will be ignored and the schedule " + "Since this is an hourly schedule, the hour parameter will be ignored and the schedule " "will run on the {minute} mark for the previous hour interval. Replace " "datetime.time(hour={hour}, minute={minute}, ...) with " "datetime.time(minute={minute}, ...) to fix this warning." diff --git a/python_modules/dagster/dagster/core/test_utils.py b/python_modules/dagster/dagster/core/test_utils.py --- a/python_modules/dagster/dagster/core/test_utils.py +++ b/python_modules/dagster/dagster/core/test_utils.py @@ -1,3 +1,4 @@ +import datetime import os import time from contextlib import contextmanager @@ -273,6 +274,10 @@ os.chdir(old) +def today_at_midnight(): + return datetime.datetime.combine(datetime.date.today(), datetime.time()) + + class ExplodingRunLauncher(RunLauncher, ConfigurableClass): def __init__(self, inst_data=None): self._inst_data = inst_data diff --git a/python_modules/dagster/dagster_tests/api_tests/test_launch_scheduled_execution.py b/python_modules/dagster/dagster_tests/api_tests/test_launch_scheduled_execution.py --- a/python_modules/dagster/dagster_tests/api_tests/test_launch_scheduled_execution.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_launch_scheduled_execution.py @@ -18,12 +18,12 @@ ScheduledExecutionSuccess, ) from dagster.core.telemetry import get_dir_from_dagster_home -from dagster.core.test_utils import instance_for_test +from dagster.core.test_utils import instance_for_test, today_at_midnight from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.grpc.server import GrpcServerProcess from dagster.utils import find_free_port -_COUPLE_DAYS_AGO = datetime.datetime.now() - datetime.timedelta(days=2) +_COUPLE_DAYS_AGO = today_at_midnight() - datetime.timedelta(days=2) def _throw(_context): diff --git a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py --- a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py @@ -444,7 +444,7 @@ context_without_time = ScheduleExecutionContext(instance, None) - start_date = datetime(year=2019, month=1, day=1, minute=1) + start_date = datetime(year=2019, month=1, day=1) @hourly_schedule( pipeline_name="foo_pipeline", @@ -464,7 +464,7 @@ assert hourly_foo_schedule.get_run_config(context_without_time) == { "hourly_time": pendulum.create( - year=2019, month=2, day=26, hour=23, minute=1, tz="US/Central" + year=2019, month=2, day=26, hour=23, tz="US/Central" ).isoformat() } assert hourly_foo_schedule.should_execute(context_without_time) @@ -491,7 +491,7 @@ assert hourly_foo_schedule.get_run_config(context_with_valid_time) == { "hourly_time": pendulum.create( - year=2019, month=1, day=27, hour=0, minute=1, tz="US/Central" + year=2019, month=1, day=27, hour=0, tz="US/Central" ).isoformat() } @@ -501,7 +501,7 @@ def test_partitions_for_hourly_schedule_decorators_with_timezone(): with instance_for_test() as instance: with pendulum.test(pendulum.create(2019, 2, 27, 0, 1, 1, tz="US/Central")): - start_date = datetime(year=2019, month=1, day=1, minute=1) + start_date = datetime(year=2019, month=1, day=1) # You can specify a start date with no timezone and it will be assumed to be # in the execution timezone @@ -532,7 +532,7 @@ assert hourly_central_schedule.get_run_config(context_with_valid_time) == { "hourly_time": pendulum.create( - year=2019, month=1, day=27, hour=0, minute=1, tz="US/Central" + year=2019, month=1, day=27, hour=0, tz="US/Central" ).isoformat() } @@ -540,7 +540,7 @@ # You can specify a start date in a different timezone and it will be transformed into the # execution timezone - start_date_with_different_timezone = pendulum.create(2019, 1, 1, 0, 1, tz="US/Pacific") + start_date_with_different_timezone = pendulum.create(2019, 1, 1, 0, tz="US/Pacific") @hourly_schedule( pipeline_name="foo_pipeline", @@ -816,6 +816,19 @@ def monthly_foo_schedule_over(): return {} + with pytest.raises( + DagsterInvalidDefinitionError, + match=re.escape("`start_date` must be at the beginning of a month for a monthly schedule."), + ): + + @monthly_schedule( + pipeline_name="foo_pipeline", + execution_day_of_month=7, + start_date=datetime(year=2019, month=1, day=5), + ) + def monthly_foo_schedule_later_in_month(): + return {} + with pytest.raises(DagsterInvalidDefinitionError): @monthly_schedule( @@ -836,6 +849,42 @@ def weekly_foo_schedule_over(): return {} + with pytest.raises( + DagsterInvalidDefinitionError, + match=re.escape("`start_date` must be at the beginning of a day for a weekly schedule."), + ): + + @weekly_schedule( + pipeline_name="foo_pipeline", + execution_day_of_week=7, + start_date=datetime(year=2019, month=1, day=1, hour=2), + ) + def weekly_foo_schedule_start_later_in_day(): + return {} + + with pytest.raises( + DagsterInvalidDefinitionError, + match=re.escape("`start_date` must be at the beginning of a day for a daily schedule."), + ): + + @daily_schedule( + pipeline_name="foo_pipeline", start_date=datetime(year=2019, month=1, day=1, hour=2), + ) + def daily_foo_schedule_start_later_in_day(): + return {} + + with pytest.raises( + DagsterInvalidDefinitionError, + match=re.escape("`start_date` must be at the beginning of an hour for an hourly schedule."), + ): + + @hourly_schedule( + pipeline_name="foo_pipeline", + start_date=datetime(year=2019, month=1, day=1, hour=2, minute=30), + ) + def hourly_foo_schedule_start_later_in_hour(): + return {} + def test_solid_docstring(): @solid