diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py --- a/python_modules/libraries/dagster-k8s/dagster_k8s/client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/client.py @@ -12,7 +12,7 @@ from dagster.core.storage.pipeline_run import PipelineRunStatus DEFAULT_WAIT_TIMEOUT = 86400.0 # 1 day -DEFAULT_WAIT_BETWEEN_ATTEMPTS = 10.0 # 10 seconds +DEFAULT_WAIT_BETWEEN_ATTEMPTS = 30.0 # 30 seconds DEFAULT_JOB_POD_COUNT = 1 # expect job:pod to be 1:1 by default @@ -80,11 +80,13 @@ def k8s_api_retry( fn, + max_retries, + timeout, msg_fn=lambda: "Unexpected error encountered in Kubernetes API Client.", - max_retries=1, - timeout=10, ): check.callable_param(fn, "fn") + check.int_param(max_retries, "max_retries") + check.numeric_param(timeout, "timeout") retries = max_retries while retries > 0: @@ -262,7 +264,9 @@ jobs = self.batch_api.list_namespaced_job(namespace=namespace) return next((j for j in jobs.items if j.metadata.name == job_name), None) - job = k8s_api_retry(_get_jobs_for_namespace, max_retries=3) + job = k8s_api_retry( + _get_jobs_for_namespace, max_retries=4, timeout=wait_time_between_attempts + ) if not job: self.logger('Job "{job_name}" not yet launched, waiting'.format(job_name=job_name)) @@ -285,7 +289,9 @@ job = self.batch_api.read_namespaced_job_status(job_name, namespace=namespace) return job.status - status = k8s_api_retry(_get_job_status, max_retries=3) + status = k8s_api_retry( + _get_job_status, max_retries=4, timeout=wait_time_between_attempts + ) # status.succeeded represents the number of pods which reached phase Succeeded. if status.succeeded == num_pods_to_wait_for: diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py --- a/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_client.py @@ -140,15 +140,9 @@ completed_job, ] - with pytest.raises(DagsterK8sAPIRetryLimitExceeded) as exc_info: + with pytest.raises(DagsterK8sAPIRetryLimitExceeded): mock_client.wait_for_job_success("a_job", "a_namespace") - assert "Retry limit of {limit} exceeded: " "Unexpected error encountered in Kubernetes API Client.".format( - limit=3 - ) in str( - exc_info.value - ) - # logger should not have been called assert not mock_client.logger.mock_calls # sleeper should not have been called @@ -271,15 +265,9 @@ completed_job = V1Job(metadata=a_job_metadata, status=V1JobStatus(failed=0, succeeded=1)) mock_client.batch_api.read_namespaced_job_status.side_effect = [completed_job] - with pytest.raises(DagsterK8sAPIRetryLimitExceeded) as exc_info: + with pytest.raises(DagsterK8sAPIRetryLimitExceeded): mock_client.wait_for_job_success("a_job", "a_namespace") - assert "Retry limit of {limit} exceeded: " "Unexpected error encountered in Kubernetes API Client.".format( - limit=3 - ) in str( - exc_info.value - ) - # 2 retries due to errors + 1 not launched + 1 launched assert len(mock_client.batch_api.list_namespaced_job.mock_calls) == 3