diff --git a/docs/next/src/pages/deploying/k8s_part1.mdx b/docs/next/src/pages/deploying/k8s_part1.mdx --- a/docs/next/src/pages/deploying/k8s_part1.mdx +++ b/docs/next/src/pages/deploying/k8s_part1.mdx @@ -63,10 +63,13 @@ ## System overview -The Helm chart installs several different components, including Dagit. In this system, the coupled with the corresponding -`celery_k8s_job_executor` handles physical execution of Dagster pipelines via Celery-on-K8s. This -deployment aims to provide: +The Helm chart installs several different components, including Dagit. + +### Celery K8s +By default, we configure Dagster to deploy with Celery. +In this system, the coupled +with the corresponding +handles physical execution of Dagster pipelines via Celery-on-K8s. This deployment aims to provide: - **User Code Separation:** In this system, the Celery workers can be deployed via fixed images without user code, but rest of the system still uses the user code image. In the next part, we @@ -93,8 +96,21 @@ step execution Job is launched to execute that step. These step execution Jobs are also launched using the `pipeline_run:` image, and each will be named `dagster-job-`. -_**Note:** In the default Helm chart, we configure a . -It is also possible to deploy with a or a custom Run Launcher. +### Celery-less K8s +You can also configure Dagster to not use Celery. In this case, the will be used, which runs pipelines as a +single K8s Jobs. We can enable this feature with the following: + +```shell +helm install dagster dagster/dagster -f /path/to/values.yaml --set celery.enabled=false +``` + +Then, in the run config, an or can be configured to run the execution +plan of the pipeline within the K8s Job. + + +_**Note:** It is also possible to deploy with a custom Run Launcher. If you have questions, please reach out to us on [Slack](https://dagster-slackin.herokuapp.com/) and we're happy to help!_ ## Helm chart diff --git a/helm/dagster/templates/_helpers.tpl b/helm/dagster/templates/_helpers.tpl --- a/helm/dagster/templates/_helpers.tpl +++ b/helm/dagster/templates/_helpers.tpl @@ -159,9 +159,10 @@ DAGSTER_K8S_CELERY_BROKER: "{{ template "dagster.celery.broker_url" . }}" DAGSTER_K8S_CELERY_BACKEND: "{{ template "dagster.celery.backend_url" . }}" DAGSTER_K8S_PG_PASSWORD_SECRET: "{{ template "dagster.fullname" .}}-postgresql-secret" +DAGSTER_K8S_INSTANCE_ENV_CONFIG_MAP: "{{ template "dagster.fullname" .}}-instance-env" DAGSTER_K8S_INSTANCE_CONFIG_MAP: "{{ template "dagster.fullname" .}}-instance" DAGSTER_K8S_PIPELINE_RUN_NAMESPACE: "{{ .Release.Namespace }}" DAGSTER_K8S_PIPELINE_RUN_ENV_CONFIGMAP: "{{ template "dagster.fullname" . }}-pipeline-env" DAGSTER_K8S_PIPELINE_RUN_IMAGE: "{{- .Values.pipeline_run.image.repository -}}:{{- .Values.pipeline_run.image.tag -}}" DAGSTER_K8S_PIPELINE_RUN_IMAGE_PULL_POLICY: "{{ .Values.pipeline_run.image.pullPolicy }}" {{- end -}} \ No newline at end of file diff --git a/helm/dagster/templates/configmap-celery.yaml b/helm/dagster/templates/configmap-celery.yaml --- a/helm/dagster/templates/configmap-celery.yaml +++ b/helm/dagster/templates/configmap-celery.yaml @@ -1,3 +1,4 @@ +{{- if .Values.celery.enabled }} apiVersion: v1 kind: ConfigMap metadata: @@ -17,3 +18,4 @@ {{- if .Values.celery.configSource }} {{ toYaml .Values.celery.configSource | indent 10 }} {{- end -}} +{{- end }} \ No newline at end of file diff --git a/helm/dagster/templates/configmap-env-celery.yaml b/helm/dagster/templates/configmap-env-celery.yaml --- a/helm/dagster/templates/configmap-env-celery.yaml +++ b/helm/dagster/templates/configmap-env-celery.yaml @@ -1,3 +1,4 @@ +{{- if .Values.celery.enabled }} apiVersion: v1 kind: ConfigMap metadata: @@ -16,3 +17,4 @@ {{- end }} {{- end }} {{- end }} +{{- end }} \ No newline at end of file diff --git a/helm/dagster/templates/configmap-env-celery.yaml b/helm/dagster/templates/configmap-env-instance.yaml copy from helm/dagster/templates/configmap-env-celery.yaml copy to helm/dagster/templates/configmap-env-instance.yaml --- a/helm/dagster/templates/configmap-env-celery.yaml +++ b/helm/dagster/templates/configmap-env-instance.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: ConfigMap metadata: - name: {{ template "dagster.fullname" . }}-celery-worker-env + name: {{ template "dagster.fullname" . }}-instance-env labels: app: {{ template "dagster.name" . }} chart: {{ template "dagster.chart" . }} @@ -9,10 +9,3 @@ heritage: {{ .Release.Service }} data: {{ include "dagster.shared_env" . | nindent 2 }} - {{- if .Values.celery.env -}} - {{- range $name, $value := .Values.celery.env }} - {{- if not (empty $value) }} - {{ $name }}: {{ $value | quote }} - {{- end }} - {{- end }} - {{- end }} diff --git a/helm/dagster/templates/configmap-instance.yaml b/helm/dagster/templates/configmap-instance.yaml --- a/helm/dagster/templates/configmap-instance.yaml +++ b/helm/dagster/templates/configmap-instance.yaml @@ -58,6 +58,7 @@ port: {{ .Values.postgresql.service.port }} run_launcher: + {{- if .Values.celery.enabled }} module: dagster_celery_k8s class: CeleryK8sRunLauncher config: @@ -73,6 +74,20 @@ {{- if .Values.celery.configSource }} {{- toYaml .Values.celery.configSource | nindent 10 }} {{- end }} + {{- else }} + module: dagster_k8s + class: K8sRunLauncher + config: + service_account_name: {{ include "dagster.serviceAccountName" . }} + dagster_home: + env: DAGSTER_HOME + instance_config_map: + env: DAGSTER_K8S_INSTANCE_CONFIG_MAP + postgres_password_secret: + env: DAGSTER_K8S_PG_PASSWORD_SECRET + env_config_maps: + - env: DAGSTER_K8S_INSTANCE_ENV_CONFIG_MAP + {{- end }} run_storage: module: dagster_postgres.run_storage diff --git a/helm/dagster/templates/deployment-celery-extras.yaml b/helm/dagster/templates/deployment-celery-extras.yaml --- a/helm/dagster/templates/deployment-celery-extras.yaml +++ b/helm/dagster/templates/deployment-celery-extras.yaml @@ -1,4 +1,4 @@ -{{ if .Values.celery.extraWorkerQueues }} +{{- if and .Values.celery.enabled .Values.celery.extraWorkerQueues }} {{ range $queue := .Values.celery.extraWorkerQueues }} apiVersion: apps/v1 kind: Deployment diff --git a/helm/dagster/templates/deployment-celery.yaml b/helm/dagster/templates/deployment-celery.yaml --- a/helm/dagster/templates/deployment-celery.yaml +++ b/helm/dagster/templates/deployment-celery.yaml @@ -1,3 +1,4 @@ +{{- if .Values.celery.enabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -127,3 +128,4 @@ tolerations: {{- toYaml . | nindent 8 }} {{- end }} +{{- end }} \ No newline at end of file diff --git a/helm/dagster/values.yaml b/helm/dagster/values.yaml --- a/helm/dagster/values.yaml +++ b/helm/dagster/values.yaml @@ -207,6 +207,7 @@ tag: "latest" pullPolicy: Always + # If disabled, the k8s run launcher will be used, which launches pipeline runs as single K8s Jobs. enabled: true # support overriding the name of Celery workers workers: diff --git a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py --- a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py +++ b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py @@ -154,10 +154,12 @@ @pytest.fixture def dagster_instance_with_k8s_scheduler( - helm_namespace, run_launcher, k8s_scheduler, schedule_tempdir + helm_namespace_for_k8s_run_launcher, run_launcher, k8s_scheduler, schedule_tempdir ): - with local_port_forward_postgres(namespace=helm_namespace) as local_forward_port: - postgres_url = "postgresql://test:test@localhost:{local_forward_port}/test".format( + with local_port_forward_postgres( + namespace=helm_namespace_for_k8s_run_launcher + ) as local_forward_port: + postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format( local_forward_port=local_forward_port ) print("Local Postgres forwarding URL: ", postgres_url) @@ -202,7 +204,30 @@ yield instance -@pytest.fixture(scope="session") +@pytest.fixture(scope='session') +def dagster_instance_for_k8s_run_launcher(helm_namespace_for_k8s_run_launcher, run_launcher): + tempdir = DagsterInstance.temp_storage() + + with local_port_forward_postgres( + namespace=helm_namespace_for_k8s_run_launcher + ) as local_forward_port: + postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format( + local_forward_port=local_forward_port + ) + print('Local Postgres forwarding URL: ', postgres_url) + + instance = DagsterInstance( + instance_type=InstanceType.EPHEMERAL, + local_artifact_storage=LocalArtifactStorage(tempdir), + run_storage=PostgresRunStorage(postgres_url), + event_storage=PostgresEventLogStorage(postgres_url), + compute_log_manager=NoOpComputeLogManager(), + run_launcher=run_launcher, + ) + yield instance + + +@pytest.fixture(scope='session') def dagster_instance(helm_namespace, run_launcher): # pylint: disable=redefined-outer-name tempdir = DagsterInstance.temp_storage() diff --git a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py --- a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py +++ b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py @@ -64,6 +64,13 @@ yield _helm_namespace_helper(helm_chart, request) +@pytest.fixture(scope="session") +def helm_namespace_for_k8s_run_launcher( + cluster_provider, request +): # pylint: disable=unused-argument, redefined-outer-name + yield _helm_namespace_helper(helm_chart_for_k8s_run_launcher, request) + + @contextmanager def test_namespace(should_cleanup=True): # Will be something like dagster-test-3fcd70 to avoid ns collisions in shared test environment @@ -176,12 +183,19 @@ time.sleep(1) # Wait for Celery worker queues to become ready - print("Waiting for celery workers") pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace) pod_names = [p.metadata.name for p in pods.items if "celery-workers" in p.metadata.name] - for pod_name in pod_names: - print("Waiting for Celery worker pod %s" % pod_name) - wait_for_pod(pod_name, namespace=namespace) + if helm_config.get("celery", {}).get("enabled"): + print("Waiting for celery workers") + for pod_name in pod_names: + print("Waiting for Celery worker pod %s" % pod_name) + wait_for_pod(pod_name, namespace=namespace) + else: + assert ( + len(pod_names) == 0 + ), "celery-worker pods {pod_names} exists when celery is not enabled.".format( + pod_names=pod_names + ) if helm_config.get("userDeployments") and helm_config.get("userDeployments", {}).get( "enabled" @@ -247,6 +261,7 @@ }, }, "celery": { + "enabled": True, "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, # https://github.com/dagster-io/dagster/issues/2671 # 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], @@ -269,6 +284,43 @@ yield +@contextmanager +def helm_chart_for_k8s_run_launcher(namespace, docker_image, should_cleanup=True): + check.str_param(namespace, "namespace") + check.str_param(docker_image, "docker_image") + check.bool_param(should_cleanup, "should_cleanup") + + repository, tag = docker_image.split(":") + pull_policy = image_pull_policy() + helm_config = { + "dagit": { + "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, + "env": {"TEST_SET_ENV_VAR": "test_dagit_env_var"}, + "env_config_maps": [TEST_CONFIGMAP_NAME], + "env_secrets": [TEST_SECRET_NAME], + "livenessProbe": { + "tcpSocket": {"port": "http"}, + "periodSeconds": 20, + "failureThreshold": 3, + }, + "startupProbe": { + "tcpSocket": {"port": "http"}, + "failureThreshold": 6, + "periodSeconds": 10, + }, + }, + "celery": {"enabled": False}, + "scheduler": {"k8sEnabled": "true", "schedulerNamespace": namespace}, + "serviceAccount": {"name": "dagit-admin"}, + "postgresqlPassword": "test", + "postgresqlDatabase": "test", + "postgresqlUser": "test", + } + + with _helm_chart_helper(namespace, should_cleanup, helm_config): + yield + + @contextmanager def helm_chart_for_user_deployments(namespace, docker_image, should_cleanup=True): check.str_param(namespace, "namespace") @@ -323,6 +375,7 @@ }, }, "celery": { + "enabled": True, "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, # https://github.com/dagster-io/dagster/issues/2671 # 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], diff --git a/integration_tests/test_suites/k8s-integration-test-suite/conftest.py b/integration_tests/test_suites/k8s-integration-test-suite/conftest.py --- a/integration_tests/test_suites/k8s-integration-test-suite/conftest.py +++ b/integration_tests/test_suites/k8s-integration-test-suite/conftest.py @@ -7,11 +7,11 @@ from dagster_k8s.launcher import K8sRunLauncher from dagster_k8s.scheduler import K8sScheduler from dagster_k8s_test_infra.cluster import ( - dagster_instance, + dagster_instance_for_k8s_run_launcher, dagster_instance_with_k8s_scheduler, define_cluster_provider_fixture, ) -from dagster_k8s_test_infra.helm import helm_namespace +from dagster_k8s_test_infra.helm import helm_namespace_for_k8s_run_launcher from dagster_k8s_test_infra.integration_utils import image_pull_policy from dagster_test.test_project import build_and_tag_test_image, test_project_docker_image @@ -43,15 +43,15 @@ @pytest.fixture def k8s_scheduler( - cluster_provider, helm_namespace + cluster_provider, helm_namespace_for_k8s_run_launcher ): # pylint: disable=redefined-outer-name,unused-argument return K8sScheduler( - scheduler_namespace=helm_namespace, - image_pull_secrets=[{"name": "element-dev-key"}], - service_account_name="dagit-admin", - instance_config_map="dagster-instance", - postgres_password_secret="dagster-postgresql-secret", - dagster_home="/opt/dagster/dagster_home", + scheduler_namespace=helm_namespace_for_k8s_run_launcher, + image_pull_secrets=[{'name': 'element-dev-key'}], + service_account_name='dagit-admin', + instance_config_map='dagster-instance', + postgres_password_secret='dagster-postgresql-secret', + dagster_home='/opt/dagster/dagster_home', job_image=test_project_docker_image(), load_incluster_config=False, kubeconfig_file=cluster_provider.kubeconfig_file, @@ -61,18 +61,20 @@ ) -@pytest.fixture(scope="function") -def restore_k8s_cron_tab(helm_namespace): # pylint: disable=redefined-outer-name +@pytest.fixture(scope='function') +def restore_k8s_cron_tab( + helm_namespace_for_k8s_run_launcher, +): # pylint: disable=redefined-outer-name kube_api = kubernetes.client.BatchV1beta1Api() # Doubly make sure CronJobs are deleted pre-test and post-test - kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace) + kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace_for_k8s_run_launcher) yield - kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace) + kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace_for_k8s_run_launcher) @pytest.fixture(scope="session") def run_launcher( - cluster_provider, helm_namespace + cluster_provider, helm_namespace_for_k8s_run_launcher ): # pylint: disable=redefined-outer-name,unused-argument return K8sRunLauncher( @@ -85,9 +87,9 @@ load_incluster_config=False, kubeconfig_file=cluster_provider.kubeconfig_file, image_pull_policy=image_pull_policy(), - job_namespace=helm_namespace, - env_config_maps=["dagster-pipeline-env", "test-env-configmap"], - env_secrets=["test-env-secret"], + job_namespace=helm_namespace_for_k8s_run_launcher, + env_config_maps=['dagster-instance-env', 'test-env-configmap'], + env_secrets=['test-env-secret'], ) diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py b/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py --- a/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py +++ b/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py @@ -17,47 +17,53 @@ @pytest.mark.integration -def test_k8s_run_launcher_default(dagster_instance, helm_namespace): - run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml")) - pipeline_name = "demo_pipeline" - tags = {"key": "value"} +def test_k8s_run_launcher_default( + dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher +): + run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) + pipeline_name = 'demo_pipeline' + tags = {'key': 'value'} run = create_run_for_test( - dagster_instance, + dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config, tags=tags, mode="default", ) - dagster_instance.launch_run( + dagster_instance_for_k8s_run_launcher.launch_run( run.run_id, ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), ) result = wait_for_job_and_get_raw_logs( - job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace + job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher ) assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) @pytest.mark.integration -def test_failing_k8s_run_launcher(dagster_instance, helm_namespace): - run_config = {"blah blah this is wrong": {}} - pipeline_name = "demo_pipeline" - run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config) +def test_failing_k8s_run_launcher( + dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher +): + run_config = {'blah blah this is wrong': {}} + pipeline_name = 'demo_pipeline' + run = create_run_for_test( + dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config + ) - dagster_instance.launch_run( + dagster_instance_for_k8s_run_launcher.launch_run( run.run_id, ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), ) result = wait_for_job_and_get_raw_logs( - job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace + job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher ) assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) - event_records = dagster_instance.all_logs(run.run_id) + event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id) assert any( ['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] @@ -66,39 +72,47 @@ @pytest.mark.integration -def test_k8s_run_launcher_terminate(dagster_instance, helm_namespace): - pipeline_name = "slow_pipeline" +def test_k8s_run_launcher_terminate( + dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher +): + pipeline_name = 'slow_pipeline' tags = {"key": "value"} run = create_run_for_test( - dagster_instance, pipeline_name=pipeline_name, run_config=None, tags=tags, mode="default", + dagster_instance_for_k8s_run_launcher, + pipeline_name=pipeline_name, + run_config=None, + tags=tags, + mode='default', ) - dagster_instance.launch_run( + dagster_instance_for_k8s_run_launcher.launch_run( run.run_id, ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), ) - wait_for_job(job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace) + wait_for_job( + job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher + ) timeout = datetime.timedelta(0, 30) start_time = datetime.datetime.now() while datetime.datetime.now() < start_time + timeout: - if dagster_instance.run_launcher.can_terminate(run_id=run.run_id): + if dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id): break time.sleep(5) - assert dagster_instance.run_launcher.can_terminate(run_id=run.run_id) - assert dagster_instance.run_launcher.terminate(run_id=run.run_id) + assert dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id) + assert dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) start_time = datetime.datetime.now() pipeline_run = None while datetime.datetime.now() < start_time + timeout: - pipeline_run = dagster_instance.get_run_by_id(run.run_id) + pipeline_run = dagster_instance_for_k8s_run_launcher.get_run_by_id(run.run_id) if pipeline_run.status == PipelineRunStatus.FAILURE: break time.sleep(5) assert pipeline_run.status == PipelineRunStatus.FAILURE - assert not dagster_instance.run_launcher.terminate(run_id=run.run_id) + assert not dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py b/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py --- a/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py +++ b/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py @@ -61,7 +61,7 @@ name: dagster-postgresql-secret env_from: - config_map_ref: - name: dagster-pipeline-env + name: dagster-instance-env - config_map_ref: name: test-env-configmap - secret_ref: @@ -125,7 +125,7 @@ name: dagster-postgresql-secret env_from: - config_map_ref: - name: dagster-pipeline-env + name: dagster-instance-env - config_map_ref: name: test-env-configmap - secret_ref: @@ -315,39 +315,48 @@ ) -def test_k8s_run_launcher(dagster_instance, helm_namespace): - run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml")) - pipeline_name = "demo_pipeline" +def test_k8s_run_launcher( + dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher +): + run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) + pipeline_name = 'demo_pipeline' run = create_run_for_test( - dagster_instance, pipeline_name=pipeline_name, run_config=run_config, mode="default", + dagster_instance_for_k8s_run_launcher, + pipeline_name=pipeline_name, + run_config=run_config, + mode='default', ) - dagster_instance.launch_run( + dagster_instance_for_k8s_run_launcher.launch_run( run.run_id, ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), ) result = wait_for_job_and_get_raw_logs( - job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace + job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher ) assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) -def test_failing_k8s_run_launcher(dagster_instance, helm_namespace): - run_config = {"blah blah this is wrong": {}} - pipeline_name = "demo_pipeline" - run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config) - dagster_instance.launch_run( +def test_failing_k8s_run_launcher( + dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher +): + run_config = {'blah blah this is wrong': {}} + pipeline_name = 'demo_pipeline' + run = create_run_for_test( + dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config + ) + dagster_instance_for_k8s_run_launcher.launch_run( run.run_id, ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), ) result = wait_for_job_and_get_raw_logs( - job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace + job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher ) assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) - event_records = dagster_instance.all_logs(run.run_id) + event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id) assert any( ['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py b/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py --- a/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py +++ b/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py @@ -95,7 +95,10 @@ @mark_scheduler def test_init( - dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, + schedule_tempdir, + helm_namespace_for_k8s_run_launcher, + restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler @@ -116,7 +119,10 @@ @mark_scheduler def test_re_init( - dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, + schedule_tempdir, + helm_namespace_for_k8s_run_launcher, + restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -144,7 +150,10 @@ @mark_scheduler def test_start_and_stop_schedule( - dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, + schedule_tempdir, + helm_namespace_for_k8s_run_launcher, + restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -169,7 +178,7 @@ @mark_scheduler def test_start_non_existent_schedule( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler with pytest.raises(DagsterScheduleDoesNotExist): @@ -179,7 +188,7 @@ @mark_scheduler def test_start_schedule_cron_job( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -226,7 +235,7 @@ @mark_scheduler def test_remove_schedule_def( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -243,7 +252,7 @@ @mark_scheduler def test_add_schedule_def( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_smaller_external_repo() @@ -283,7 +292,7 @@ @mark_scheduler def test_start_and_stop_schedule_cron_tab( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -373,7 +382,10 @@ @mark_scheduler def test_script_execution( - dagster_instance_with_k8s_scheduler, unset_dagster_home, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, + unset_dagster_home, + helm_namespace_for_k8s_run_launcher, + restore_k8s_cron_tab, ): # pylint:disable=unused-argument,redefined-outer-name with seven.TemporaryDirectory() as tempdir: with environ({"DAGSTER_HOME": tempdir}): @@ -394,7 +406,9 @@ ).get_origin_id() batch_v1beta1_api = kubernetes.client.BatchV1beta1Api() - cron_job = batch_v1beta1_api.read_namespaced_cron_job(cron_job_name, helm_namespace) + cron_job = batch_v1beta1_api.read_namespaced_cron_job( + cron_job_name, helm_namespace_for_k8s_run_launcher + ) container = cron_job.spec.job_template.spec.template.spec.containers[0] command = container.command args = container.args @@ -428,7 +442,7 @@ @mark_scheduler def test_start_schedule_fails( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -454,7 +468,7 @@ @mark_scheduler def test_start_schedule_unsuccessful( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -482,7 +496,7 @@ @mark_scheduler def test_start_schedule_manual_delete_debug( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -515,7 +529,7 @@ @mark_scheduler def test_start_schedule_manual_add_debug( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -540,7 +554,10 @@ @mark_scheduler def test_stop_schedule_fails( - dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, + schedule_tempdir, + helm_namespace_for_k8s_run_launcher, + restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -571,7 +588,7 @@ @mark_scheduler def test_stop_schedule_unsuccessful( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -603,7 +620,7 @@ @mark_scheduler def test_wipe( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -625,7 +642,7 @@ @mark_scheduler def test_reconcile_failure( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() @@ -657,7 +674,7 @@ @mark_scheduler def test_reconcile_failure_when_deleting_schedule_def( - dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, + dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, ): # pylint:disable=unused-argument instance = dagster_instance_with_k8s_scheduler external_repo = get_test_external_repo() diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py b/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py --- a/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py @@ -157,7 +157,7 @@ job_cfg = DagsterK8sJobConfig.config_type() run_launcher_extra_cfg = { - "job_namespace": StringSource, + "job_namespace": Field(StringSource, is_required=False, default_value="default"), "load_incluster_config": Field(bool, is_required=False, default_value=True), "kubeconfig_file": Field(Noneable(str), is_required=False, default_value=None), }