diff --git a/docs/next/src/pages/deploying/k8s_part1.mdx b/docs/next/src/pages/deploying/k8s_part1.mdx
--- a/docs/next/src/pages/deploying/k8s_part1.mdx
+++ b/docs/next/src/pages/deploying/k8s_part1.mdx
@@ -63,10 +63,13 @@
## System overview
-The Helm chart installs several different components, including Dagit. In this system, the coupled with the corresponding
-`celery_k8s_job_executor` handles physical execution of Dagster pipelines via Celery-on-K8s. This
-deployment aims to provide:
+The Helm chart installs several different components, including Dagit.
+
+### Celery K8s
+By default, we configure Dagster to deploy with Celery.
+In this system, the coupled
+with the corresponding
+handles physical execution of Dagster pipelines via Celery-on-K8s. This deployment aims to provide:
- **User Code Separation:** In this system, the Celery workers can be deployed via fixed images
without user code, but rest of the system still uses the user code image. In the next part, we
@@ -93,8 +96,21 @@
step execution Job is launched to execute that step. These step execution Jobs are also launched
using the `pipeline_run:` image, and each will be named `dagster-job-`.
-_**Note:** In the default Helm chart, we configure a .
-It is also possible to deploy with a or a custom Run Launcher.
+### Celery-less K8s
+You can also configure Dagster to not use Celery. In this case, the will be used, which runs pipelines as a
+single K8s Jobs. We can enable this feature with the following:
+
+```shell
+helm install dagster dagster/dagster -f /path/to/values.yaml --set celery.enabled=false
+```
+
+Then, in the run config, an or can be configured to run the execution
+plan of the pipeline within the K8s Job.
+
+
+_**Note:** It is also possible to deploy with a custom Run Launcher.
If you have questions, please reach out to us on [Slack](https://dagster-slackin.herokuapp.com/) and we're happy to help!_
## Helm chart
diff --git a/helm/dagster/templates/_helpers.tpl b/helm/dagster/templates/_helpers.tpl
--- a/helm/dagster/templates/_helpers.tpl
+++ b/helm/dagster/templates/_helpers.tpl
@@ -159,9 +159,10 @@
DAGSTER_K8S_CELERY_BROKER: "{{ template "dagster.celery.broker_url" . }}"
DAGSTER_K8S_CELERY_BACKEND: "{{ template "dagster.celery.backend_url" . }}"
DAGSTER_K8S_PG_PASSWORD_SECRET: "{{ template "dagster.fullname" .}}-postgresql-secret"
+DAGSTER_K8S_INSTANCE_ENV_CONFIG_MAP: "{{ template "dagster.fullname" .}}-instance-env"
DAGSTER_K8S_INSTANCE_CONFIG_MAP: "{{ template "dagster.fullname" .}}-instance"
DAGSTER_K8S_PIPELINE_RUN_NAMESPACE: "{{ .Release.Namespace }}"
DAGSTER_K8S_PIPELINE_RUN_ENV_CONFIGMAP: "{{ template "dagster.fullname" . }}-pipeline-env"
DAGSTER_K8S_PIPELINE_RUN_IMAGE: "{{- .Values.pipeline_run.image.repository -}}:{{- .Values.pipeline_run.image.tag -}}"
DAGSTER_K8S_PIPELINE_RUN_IMAGE_PULL_POLICY: "{{ .Values.pipeline_run.image.pullPolicy }}"
{{- end -}}
\ No newline at end of file
diff --git a/helm/dagster/templates/configmap-celery.yaml b/helm/dagster/templates/configmap-celery.yaml
--- a/helm/dagster/templates/configmap-celery.yaml
+++ b/helm/dagster/templates/configmap-celery.yaml
@@ -1,3 +1,4 @@
+{{- if .Values.celery.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
@@ -17,3 +18,4 @@
{{- if .Values.celery.configSource }}
{{ toYaml .Values.celery.configSource | indent 10 }}
{{- end -}}
+{{- end }}
\ No newline at end of file
diff --git a/helm/dagster/templates/configmap-env-celery.yaml b/helm/dagster/templates/configmap-env-celery.yaml
--- a/helm/dagster/templates/configmap-env-celery.yaml
+++ b/helm/dagster/templates/configmap-env-celery.yaml
@@ -1,3 +1,4 @@
+{{- if .Values.celery.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
@@ -16,3 +17,4 @@
{{- end }}
{{- end }}
{{- end }}
+{{- end }}
\ No newline at end of file
diff --git a/helm/dagster/templates/configmap-env-celery.yaml b/helm/dagster/templates/configmap-env-instance.yaml
copy from helm/dagster/templates/configmap-env-celery.yaml
copy to helm/dagster/templates/configmap-env-instance.yaml
--- a/helm/dagster/templates/configmap-env-celery.yaml
+++ b/helm/dagster/templates/configmap-env-instance.yaml
@@ -1,7 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
- name: {{ template "dagster.fullname" . }}-celery-worker-env
+ name: {{ template "dagster.fullname" . }}-instance-env
labels:
app: {{ template "dagster.name" . }}
chart: {{ template "dagster.chart" . }}
@@ -9,10 +9,3 @@
heritage: {{ .Release.Service }}
data:
{{ include "dagster.shared_env" . | nindent 2 }}
- {{- if .Values.celery.env -}}
- {{- range $name, $value := .Values.celery.env }}
- {{- if not (empty $value) }}
- {{ $name }}: {{ $value | quote }}
- {{- end }}
- {{- end }}
- {{- end }}
diff --git a/helm/dagster/templates/configmap-instance.yaml b/helm/dagster/templates/configmap-instance.yaml
--- a/helm/dagster/templates/configmap-instance.yaml
+++ b/helm/dagster/templates/configmap-instance.yaml
@@ -58,6 +58,7 @@
port: {{ .Values.postgresql.service.port }}
run_launcher:
+ {{- if .Values.celery.enabled }}
module: dagster_celery_k8s
class: CeleryK8sRunLauncher
config:
@@ -73,6 +74,20 @@
{{- if .Values.celery.configSource }}
{{- toYaml .Values.celery.configSource | nindent 10 }}
{{- end }}
+ {{- else }}
+ module: dagster_k8s
+ class: K8sRunLauncher
+ config:
+ service_account_name: {{ include "dagster.serviceAccountName" . }}
+ dagster_home:
+ env: DAGSTER_HOME
+ instance_config_map:
+ env: DAGSTER_K8S_INSTANCE_CONFIG_MAP
+ postgres_password_secret:
+ env: DAGSTER_K8S_PG_PASSWORD_SECRET
+ env_config_maps:
+ - env: DAGSTER_K8S_INSTANCE_ENV_CONFIG_MAP
+ {{- end }}
run_storage:
module: dagster_postgres.run_storage
diff --git a/helm/dagster/templates/deployment-celery-extras.yaml b/helm/dagster/templates/deployment-celery-extras.yaml
--- a/helm/dagster/templates/deployment-celery-extras.yaml
+++ b/helm/dagster/templates/deployment-celery-extras.yaml
@@ -1,4 +1,4 @@
-{{ if .Values.celery.extraWorkerQueues }}
+{{- if and .Values.celery.enabled .Values.celery.extraWorkerQueues }}
{{ range $queue := .Values.celery.extraWorkerQueues }}
apiVersion: apps/v1
kind: Deployment
diff --git a/helm/dagster/templates/deployment-celery.yaml b/helm/dagster/templates/deployment-celery.yaml
--- a/helm/dagster/templates/deployment-celery.yaml
+++ b/helm/dagster/templates/deployment-celery.yaml
@@ -1,3 +1,4 @@
+{{- if .Values.celery.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -127,3 +128,4 @@
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
+{{- end }}
\ No newline at end of file
diff --git a/helm/dagster/values.yaml b/helm/dagster/values.yaml
--- a/helm/dagster/values.yaml
+++ b/helm/dagster/values.yaml
@@ -207,6 +207,7 @@
tag: "latest"
pullPolicy: Always
+ # If disabled, the k8s run launcher will be used, which launches pipeline runs as single K8s Jobs.
enabled: true
# support overriding the name of Celery workers
workers:
diff --git a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
--- a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
+++ b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
@@ -154,10 +154,12 @@
@pytest.fixture
def dagster_instance_with_k8s_scheduler(
- helm_namespace, run_launcher, k8s_scheduler, schedule_tempdir
+ helm_namespace_for_k8s_run_launcher, run_launcher, k8s_scheduler, schedule_tempdir
):
- with local_port_forward_postgres(namespace=helm_namespace) as local_forward_port:
- postgres_url = "postgresql://test:test@localhost:{local_forward_port}/test".format(
+ with local_port_forward_postgres(
+ namespace=helm_namespace_for_k8s_run_launcher
+ ) as local_forward_port:
+ postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format(
local_forward_port=local_forward_port
)
print("Local Postgres forwarding URL: ", postgres_url)
@@ -202,7 +204,30 @@
yield instance
-@pytest.fixture(scope="session")
+@pytest.fixture(scope='session')
+def dagster_instance_for_k8s_run_launcher(helm_namespace_for_k8s_run_launcher, run_launcher):
+ tempdir = DagsterInstance.temp_storage()
+
+ with local_port_forward_postgres(
+ namespace=helm_namespace_for_k8s_run_launcher
+ ) as local_forward_port:
+ postgres_url = 'postgresql://test:test@localhost:{local_forward_port}/test'.format(
+ local_forward_port=local_forward_port
+ )
+ print('Local Postgres forwarding URL: ', postgres_url)
+
+ instance = DagsterInstance(
+ instance_type=InstanceType.EPHEMERAL,
+ local_artifact_storage=LocalArtifactStorage(tempdir),
+ run_storage=PostgresRunStorage(postgres_url),
+ event_storage=PostgresEventLogStorage(postgres_url),
+ compute_log_manager=NoOpComputeLogManager(),
+ run_launcher=run_launcher,
+ )
+ yield instance
+
+
+@pytest.fixture(scope='session')
def dagster_instance(helm_namespace, run_launcher): # pylint: disable=redefined-outer-name
tempdir = DagsterInstance.temp_storage()
diff --git a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
--- a/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
+++ b/integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
@@ -64,6 +64,13 @@
yield _helm_namespace_helper(helm_chart, request)
+@pytest.fixture(scope="session")
+def helm_namespace_for_k8s_run_launcher(
+ cluster_provider, request
+): # pylint: disable=unused-argument, redefined-outer-name
+ yield _helm_namespace_helper(helm_chart_for_k8s_run_launcher, request)
+
+
@contextmanager
def test_namespace(should_cleanup=True):
# Will be something like dagster-test-3fcd70 to avoid ns collisions in shared test environment
@@ -176,12 +183,19 @@
time.sleep(1)
# Wait for Celery worker queues to become ready
- print("Waiting for celery workers")
pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace)
pod_names = [p.metadata.name for p in pods.items if "celery-workers" in p.metadata.name]
- for pod_name in pod_names:
- print("Waiting for Celery worker pod %s" % pod_name)
- wait_for_pod(pod_name, namespace=namespace)
+ if helm_config.get("celery", {}).get("enabled"):
+ print("Waiting for celery workers")
+ for pod_name in pod_names:
+ print("Waiting for Celery worker pod %s" % pod_name)
+ wait_for_pod(pod_name, namespace=namespace)
+ else:
+ assert (
+ len(pod_names) == 0
+ ), "celery-worker pods {pod_names} exists when celery is not enabled.".format(
+ pod_names=pod_names
+ )
if helm_config.get("userDeployments") and helm_config.get("userDeployments", {}).get(
"enabled"
@@ -247,6 +261,7 @@
},
},
"celery": {
+ "enabled": True,
"image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy},
# https://github.com/dagster-io/dagster/issues/2671
# 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},],
@@ -269,6 +284,43 @@
yield
+@contextmanager
+def helm_chart_for_k8s_run_launcher(namespace, docker_image, should_cleanup=True):
+ check.str_param(namespace, "namespace")
+ check.str_param(docker_image, "docker_image")
+ check.bool_param(should_cleanup, "should_cleanup")
+
+ repository, tag = docker_image.split(":")
+ pull_policy = image_pull_policy()
+ helm_config = {
+ "dagit": {
+ "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy},
+ "env": {"TEST_SET_ENV_VAR": "test_dagit_env_var"},
+ "env_config_maps": [TEST_CONFIGMAP_NAME],
+ "env_secrets": [TEST_SECRET_NAME],
+ "livenessProbe": {
+ "tcpSocket": {"port": "http"},
+ "periodSeconds": 20,
+ "failureThreshold": 3,
+ },
+ "startupProbe": {
+ "tcpSocket": {"port": "http"},
+ "failureThreshold": 6,
+ "periodSeconds": 10,
+ },
+ },
+ "celery": {"enabled": False},
+ "scheduler": {"k8sEnabled": "true", "schedulerNamespace": namespace},
+ "serviceAccount": {"name": "dagit-admin"},
+ "postgresqlPassword": "test",
+ "postgresqlDatabase": "test",
+ "postgresqlUser": "test",
+ }
+
+ with _helm_chart_helper(namespace, should_cleanup, helm_config):
+ yield
+
+
@contextmanager
def helm_chart_for_user_deployments(namespace, docker_image, should_cleanup=True):
check.str_param(namespace, "namespace")
@@ -323,6 +375,7 @@
},
},
"celery": {
+ "enabled": True,
"image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy},
# https://github.com/dagster-io/dagster/issues/2671
# 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},],
diff --git a/integration_tests/test_suites/k8s-integration-test-suite/conftest.py b/integration_tests/test_suites/k8s-integration-test-suite/conftest.py
--- a/integration_tests/test_suites/k8s-integration-test-suite/conftest.py
+++ b/integration_tests/test_suites/k8s-integration-test-suite/conftest.py
@@ -7,11 +7,11 @@
from dagster_k8s.launcher import K8sRunLauncher
from dagster_k8s.scheduler import K8sScheduler
from dagster_k8s_test_infra.cluster import (
- dagster_instance,
+ dagster_instance_for_k8s_run_launcher,
dagster_instance_with_k8s_scheduler,
define_cluster_provider_fixture,
)
-from dagster_k8s_test_infra.helm import helm_namespace
+from dagster_k8s_test_infra.helm import helm_namespace_for_k8s_run_launcher
from dagster_k8s_test_infra.integration_utils import image_pull_policy
from dagster_test.test_project import build_and_tag_test_image, test_project_docker_image
@@ -43,15 +43,15 @@
@pytest.fixture
def k8s_scheduler(
- cluster_provider, helm_namespace
+ cluster_provider, helm_namespace_for_k8s_run_launcher
): # pylint: disable=redefined-outer-name,unused-argument
return K8sScheduler(
- scheduler_namespace=helm_namespace,
- image_pull_secrets=[{"name": "element-dev-key"}],
- service_account_name="dagit-admin",
- instance_config_map="dagster-instance",
- postgres_password_secret="dagster-postgresql-secret",
- dagster_home="/opt/dagster/dagster_home",
+ scheduler_namespace=helm_namespace_for_k8s_run_launcher,
+ image_pull_secrets=[{'name': 'element-dev-key'}],
+ service_account_name='dagit-admin',
+ instance_config_map='dagster-instance',
+ postgres_password_secret='dagster-postgresql-secret',
+ dagster_home='/opt/dagster/dagster_home',
job_image=test_project_docker_image(),
load_incluster_config=False,
kubeconfig_file=cluster_provider.kubeconfig_file,
@@ -61,18 +61,20 @@
)
-@pytest.fixture(scope="function")
-def restore_k8s_cron_tab(helm_namespace): # pylint: disable=redefined-outer-name
+@pytest.fixture(scope='function')
+def restore_k8s_cron_tab(
+ helm_namespace_for_k8s_run_launcher,
+): # pylint: disable=redefined-outer-name
kube_api = kubernetes.client.BatchV1beta1Api()
# Doubly make sure CronJobs are deleted pre-test and post-test
- kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace)
+ kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace_for_k8s_run_launcher)
yield
- kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace)
+ kube_api.delete_collection_namespaced_cron_job(namespace=helm_namespace_for_k8s_run_launcher)
@pytest.fixture(scope="session")
def run_launcher(
- cluster_provider, helm_namespace
+ cluster_provider, helm_namespace_for_k8s_run_launcher
): # pylint: disable=redefined-outer-name,unused-argument
return K8sRunLauncher(
@@ -85,9 +87,9 @@
load_incluster_config=False,
kubeconfig_file=cluster_provider.kubeconfig_file,
image_pull_policy=image_pull_policy(),
- job_namespace=helm_namespace,
- env_config_maps=["dagster-pipeline-env", "test-env-configmap"],
- env_secrets=["test-env-secret"],
+ job_namespace=helm_namespace_for_k8s_run_launcher,
+ env_config_maps=['dagster-instance-env', 'test-env-configmap'],
+ env_secrets=['test-env-secret'],
)
diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py b/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
--- a/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
+++ b/integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
@@ -17,47 +17,53 @@
@pytest.mark.integration
-def test_k8s_run_launcher_default(dagster_instance, helm_namespace):
- run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml"))
- pipeline_name = "demo_pipeline"
- tags = {"key": "value"}
+def test_k8s_run_launcher_default(
+ dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher
+):
+ run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml'))
+ pipeline_name = 'demo_pipeline'
+ tags = {'key': 'value'}
run = create_run_for_test(
- dagster_instance,
+ dagster_instance_for_k8s_run_launcher,
pipeline_name=pipeline_name,
run_config=run_config,
tags=tags,
mode="default",
)
- dagster_instance.launch_run(
+ dagster_instance_for_k8s_run_launcher.launch_run(
run.run_id,
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)),
)
result = wait_for_job_and_get_raw_logs(
- job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace
+ job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher
)
assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result)
@pytest.mark.integration
-def test_failing_k8s_run_launcher(dagster_instance, helm_namespace):
- run_config = {"blah blah this is wrong": {}}
- pipeline_name = "demo_pipeline"
- run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config)
+def test_failing_k8s_run_launcher(
+ dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher
+):
+ run_config = {'blah blah this is wrong': {}}
+ pipeline_name = 'demo_pipeline'
+ run = create_run_for_test(
+ dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config
+ )
- dagster_instance.launch_run(
+ dagster_instance_for_k8s_run_launcher.launch_run(
run.run_id,
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)),
)
result = wait_for_job_and_get_raw_logs(
- job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace
+ job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher
)
assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result)
- event_records = dagster_instance.all_logs(run.run_id)
+ event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id)
assert any(
['Undefined field "blah blah this is wrong"' in str(event) for event in event_records]
@@ -66,39 +72,47 @@
@pytest.mark.integration
-def test_k8s_run_launcher_terminate(dagster_instance, helm_namespace):
- pipeline_name = "slow_pipeline"
+def test_k8s_run_launcher_terminate(
+ dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher
+):
+ pipeline_name = 'slow_pipeline'
tags = {"key": "value"}
run = create_run_for_test(
- dagster_instance, pipeline_name=pipeline_name, run_config=None, tags=tags, mode="default",
+ dagster_instance_for_k8s_run_launcher,
+ pipeline_name=pipeline_name,
+ run_config=None,
+ tags=tags,
+ mode='default',
)
- dagster_instance.launch_run(
+ dagster_instance_for_k8s_run_launcher.launch_run(
run.run_id,
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)),
)
- wait_for_job(job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace)
+ wait_for_job(
+ job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher
+ )
timeout = datetime.timedelta(0, 30)
start_time = datetime.datetime.now()
while datetime.datetime.now() < start_time + timeout:
- if dagster_instance.run_launcher.can_terminate(run_id=run.run_id):
+ if dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id):
break
time.sleep(5)
- assert dagster_instance.run_launcher.can_terminate(run_id=run.run_id)
- assert dagster_instance.run_launcher.terminate(run_id=run.run_id)
+ assert dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id)
+ assert dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id)
start_time = datetime.datetime.now()
pipeline_run = None
while datetime.datetime.now() < start_time + timeout:
- pipeline_run = dagster_instance.get_run_by_id(run.run_id)
+ pipeline_run = dagster_instance_for_k8s_run_launcher.get_run_by_id(run.run_id)
if pipeline_run.status == PipelineRunStatus.FAILURE:
break
time.sleep(5)
assert pipeline_run.status == PipelineRunStatus.FAILURE
- assert not dagster_instance.run_launcher.terminate(run_id=run.run_id)
+ assert not dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id)
diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py b/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
--- a/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
+++ b/integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
@@ -61,7 +61,7 @@
name: dagster-postgresql-secret
env_from:
- config_map_ref:
- name: dagster-pipeline-env
+ name: dagster-instance-env
- config_map_ref:
name: test-env-configmap
- secret_ref:
@@ -125,7 +125,7 @@
name: dagster-postgresql-secret
env_from:
- config_map_ref:
- name: dagster-pipeline-env
+ name: dagster-instance-env
- config_map_ref:
name: test-env-configmap
- secret_ref:
@@ -315,39 +315,48 @@
)
-def test_k8s_run_launcher(dagster_instance, helm_namespace):
- run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml"))
- pipeline_name = "demo_pipeline"
+def test_k8s_run_launcher(
+ dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher
+):
+ run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml'))
+ pipeline_name = 'demo_pipeline'
run = create_run_for_test(
- dagster_instance, pipeline_name=pipeline_name, run_config=run_config, mode="default",
+ dagster_instance_for_k8s_run_launcher,
+ pipeline_name=pipeline_name,
+ run_config=run_config,
+ mode='default',
)
- dagster_instance.launch_run(
+ dagster_instance_for_k8s_run_launcher.launch_run(
run.run_id,
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)),
)
result = wait_for_job_and_get_raw_logs(
- job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace
+ job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher
)
assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result)
-def test_failing_k8s_run_launcher(dagster_instance, helm_namespace):
- run_config = {"blah blah this is wrong": {}}
- pipeline_name = "demo_pipeline"
- run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config)
- dagster_instance.launch_run(
+def test_failing_k8s_run_launcher(
+ dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher
+):
+ run_config = {'blah blah this is wrong': {}}
+ pipeline_name = 'demo_pipeline'
+ run = create_run_for_test(
+ dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config
+ )
+ dagster_instance_for_k8s_run_launcher.launch_run(
run.run_id,
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)),
)
result = wait_for_job_and_get_raw_logs(
- job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace
+ job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher
)
assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result)
- event_records = dagster_instance.all_logs(run.run_id)
+ event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id)
assert any(
['Undefined field "blah blah this is wrong"' in str(event) for event in event_records]
diff --git a/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py b/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py
--- a/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py
+++ b/integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py
@@ -95,7 +95,10 @@
@mark_scheduler
def test_init(
- dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler,
+ schedule_tempdir,
+ helm_namespace_for_k8s_run_launcher,
+ restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
@@ -116,7 +119,10 @@
@mark_scheduler
def test_re_init(
- dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler,
+ schedule_tempdir,
+ helm_namespace_for_k8s_run_launcher,
+ restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -144,7 +150,10 @@
@mark_scheduler
def test_start_and_stop_schedule(
- dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler,
+ schedule_tempdir,
+ helm_namespace_for_k8s_run_launcher,
+ restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -169,7 +178,7 @@
@mark_scheduler
def test_start_non_existent_schedule(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
with pytest.raises(DagsterScheduleDoesNotExist):
@@ -179,7 +188,7 @@
@mark_scheduler
def test_start_schedule_cron_job(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -226,7 +235,7 @@
@mark_scheduler
def test_remove_schedule_def(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -243,7 +252,7 @@
@mark_scheduler
def test_add_schedule_def(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_smaller_external_repo()
@@ -283,7 +292,7 @@
@mark_scheduler
def test_start_and_stop_schedule_cron_tab(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -373,7 +382,10 @@
@mark_scheduler
def test_script_execution(
- dagster_instance_with_k8s_scheduler, unset_dagster_home, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler,
+ unset_dagster_home,
+ helm_namespace_for_k8s_run_launcher,
+ restore_k8s_cron_tab,
): # pylint:disable=unused-argument,redefined-outer-name
with seven.TemporaryDirectory() as tempdir:
with environ({"DAGSTER_HOME": tempdir}):
@@ -394,7 +406,9 @@
).get_origin_id()
batch_v1beta1_api = kubernetes.client.BatchV1beta1Api()
- cron_job = batch_v1beta1_api.read_namespaced_cron_job(cron_job_name, helm_namespace)
+ cron_job = batch_v1beta1_api.read_namespaced_cron_job(
+ cron_job_name, helm_namespace_for_k8s_run_launcher
+ )
container = cron_job.spec.job_template.spec.template.spec.containers[0]
command = container.command
args = container.args
@@ -428,7 +442,7 @@
@mark_scheduler
def test_start_schedule_fails(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -454,7 +468,7 @@
@mark_scheduler
def test_start_schedule_unsuccessful(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -482,7 +496,7 @@
@mark_scheduler
def test_start_schedule_manual_delete_debug(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -515,7 +529,7 @@
@mark_scheduler
def test_start_schedule_manual_add_debug(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -540,7 +554,10 @@
@mark_scheduler
def test_stop_schedule_fails(
- dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler,
+ schedule_tempdir,
+ helm_namespace_for_k8s_run_launcher,
+ restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -571,7 +588,7 @@
@mark_scheduler
def test_stop_schedule_unsuccessful(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -603,7 +620,7 @@
@mark_scheduler
def test_wipe(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -625,7 +642,7 @@
@mark_scheduler
def test_reconcile_failure(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
@@ -657,7 +674,7 @@
@mark_scheduler
def test_reconcile_failure_when_deleting_schedule_def(
- dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab,
+ dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab,
): # pylint:disable=unused-argument
instance = dagster_instance_with_k8s_scheduler
external_repo = get_test_external_repo()
diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py b/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
--- a/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
+++ b/python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
@@ -157,7 +157,7 @@
job_cfg = DagsterK8sJobConfig.config_type()
run_launcher_extra_cfg = {
- "job_namespace": StringSource,
+ "job_namespace": Field(StringSource, is_required=False, default_value="default"),
"load_incluster_config": Field(bool, is_required=False, default_value=True),
"kubeconfig_file": Field(Noneable(str), is_required=False, default_value=None),
}