diff --git a/.buildkite/hooks/pre-exit b/.buildkite/hooks/pre-exit --- a/.buildkite/hooks/pre-exit +++ b/.buildkite/hooks/pre-exit @@ -25,6 +25,16 @@ docker-compose rm -f popd +pushd python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/ +docker-compose stop +docker-compose rm -f +popd + +pushd python_modules/libraries/dagster-airflow/dagster_airflow_tests/ +docker-compose stop +docker-compose rm -f +popd + pushd python_modules/libraries/dagster-postgres/dagster_postgres_tests/ docker-compose stop docker-compose rm -f diff --git a/.buildkite/pipeline.py b/.buildkite/pipeline.py --- a/.buildkite/pipeline.py +++ b/.buildkite/pipeline.py @@ -117,6 +117,13 @@ r"aws s3 cp s3://\${BUILDKITE_SECRETS_BUCKET}/gcp-key-elementl-dev.json " + GCP_CREDS_LOCAL_FILE, "export GOOGLE_APPLICATION_CREDENTIALS=" + GCP_CREDS_LOCAL_FILE, + "pushd python_modules/libraries/dagster-airflow/dagster_airflow_tests/", + "docker-compose up -d --remove-orphans", + network_buildkite_container("postgres"), + connect_sibling_docker_container( + "postgres", "test-postgres-db-airflow", "POSTGRES_TEST_DB_HOST", + ), + "popd", ] @@ -152,6 +159,18 @@ ] +def celery_docker_extra_cmds_fn(version): + return celery_extra_cmds_fn(version) + [ + "pushd python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/", + "docker-compose up -d --remove-orphans", + network_buildkite_container("postgres"), + connect_sibling_docker_container( + "postgres", "test-postgres-db-celery-docker", "POSTGRES_TEST_DB_HOST", + ), + "popd", + ] + + def integration_suite_extra_cmds_fn(version): return [ 'export AIRFLOW_HOME="/airflow"', @@ -380,7 +399,7 @@ ModuleBuildSpec( "python_modules/libraries/dagster-celery-docker", env_vars=["AWS_ACCOUNT_ID", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"], - extra_cmds_fn=celery_extra_cmds_fn, + extra_cmds_fn=celery_docker_extra_cmds_fn, depends_on_fn=test_image_depends_fn, ), ModuleBuildSpec( diff --git a/examples/legacy_examples/dagster_examples_tests/event_pipeline_demo_tests/test_airflowized.py b/examples/legacy_examples/dagster_examples_tests/event_pipeline_demo_tests/test_airflowized.py --- a/examples/legacy_examples/dagster_examples_tests/event_pipeline_demo_tests/test_airflowized.py +++ b/examples/legacy_examples/dagster_examples_tests/event_pipeline_demo_tests/test_airflowized.py @@ -1,7 +1,7 @@ # pylint: disable=unused-import import pytest -from dagster_airflow.test_fixtures import dagster_airflow_python_operator_pipeline +from dagster_airflow_tests.test_fixtures import dagster_airflow_python_operator_pipeline from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline from dagster.core.definitions.reconstructable import ReconstructableRepository diff --git a/integration_tests/test_suites/airflow-integration-test-suite/test_integration.py b/integration_tests/test_suites/airflow-integration-test-suite/test_integration.py --- a/integration_tests/test_suites/airflow-integration-test-suite/test_integration.py +++ b/integration_tests/test_suites/airflow-integration-test-suite/test_integration.py @@ -6,12 +6,12 @@ from airflow.exceptions import AirflowException from airflow.utils import timezone from dagster_airflow.factory import make_airflow_dag_kubernetized_for_recon_repo -from dagster_airflow.test_fixtures import ( # pylint: disable=unused-import +from dagster_airflow_tests.marks import nettest +from dagster_airflow_tests.test_factory.utils import validate_pipeline_execution +from dagster_airflow_tests.test_fixtures import ( # pylint: disable=unused-import dagster_airflow_k8s_operator_pipeline, execute_tasks_in_dag, ) -from dagster_airflow_tests.marks import nettest -from dagster_airflow_tests.test_factory.utils import validate_pipeline_execution from dagster_test.test_project import test_project_environments_path from dagster.core.definitions.reconstructable import ReconstructableRepository diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py @@ -459,6 +459,7 @@ dag_description=None, dag_kwargs=None, op_kwargs=None, + instance=None, ): check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) check.str_param(pipeline_name, "pipeline_name") @@ -481,6 +482,7 @@ dag_kwargs=dag_kwargs, op_kwargs=op_kwargs, operator=DagsterDockerOperator, + instance=instance, ) diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py b/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py @@ -1,30 +1,18 @@ import ast -import sys +import json import warnings from contextlib import contextmanager from airflow.exceptions import AirflowException from dagster_airflow.vendor.docker_operator import DockerOperator -from dagster_graphql.client.mutations import ( - DagsterGraphQLClientError, - handle_execute_plan_result_raw, - handle_execution_errors, -) -from dagster_graphql.client.query import RAW_EXECUTE_PLAN_MUTATION -from dagster_graphql.client.util import construct_execute_plan_variables, parse_raw_log_lines from docker import APIClient, from_env from dagster import check, seven -from dagster.core.events import EngineEventData from dagster.core.instance import AIRFLOW_EXECUTION_DATE_STR, DagsterInstance -from dagster.utils.error import serializable_error_info_from_exc_info +from dagster.grpc.types import ExecuteStepArgs +from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple -from .util import ( - airflow_tags_for_ts, - check_events_for_failures, - check_events_for_skips, - get_aws_environment, -) +from .util import check_events_for_failures, check_events_for_skips, get_aws_environment DOCKER_TEMPDIR = "/tmp" @@ -100,13 +88,10 @@ self.step_keys = dagster_operator_parameters.step_keys self.recon_repo = dagster_operator_parameters.recon_repo self._run_id = None - # self.instance might be None in, for instance, a unit test setting where the operator - # was being directly instantiated without passing through make_airflow_dag - self.instance = ( - DagsterInstance.from_ref(dagster_operator_parameters.instance_ref) - if dagster_operator_parameters.instance_ref - else None - ) + + self.instance_ref = dagster_operator_parameters.instance_ref + check.invariant(self.instance_ref) + self.instance = DagsterInstance.from_ref(self.instance_ref) # These shenanigans are so we can override DockerOperator.get_hook in order to configure # a docker client using docker.from_env, rather than messing with the logic of @@ -181,7 +166,9 @@ res = [] line = "" - for new_line in self.cli.logs(container=self.container["Id"], stream=True): + for new_line in self.cli.logs( + container=self.container["Id"], stream=True, stdout=True, stderr=False + ): line = new_line.strip() if hasattr(line, "decode"): line = line.decode("utf-8") @@ -217,27 +204,23 @@ def query(self, airflow_ts): check.opt_str_param(airflow_ts, "airflow_ts") - variables = construct_execute_plan_variables( - self.recon_repo, - self.mode, - self.run_config, - self.pipeline_name, - self.run_id, - self.step_keys, - ) - - tags = airflow_tags_for_ts(airflow_ts) - variables["executionParams"]["executionMetadata"]["tags"] = tags - - self.log.info( - "Executing GraphQL query: {query}\n".format(query=RAW_EXECUTE_PLAN_MUTATION) - + "with variables:\n" - + seven.json.dumps(variables, indent=2) + recon_pipeline = self.recon_repo.get_reconstructable_pipeline(self.pipeline_name) + + input_json = serialize_dagster_namedtuple( + ExecuteStepArgs( + pipeline_origin=recon_pipeline.get_origin(), + pipeline_run_id=self.run_id, + instance_ref=self.instance_ref, + mode=self.mode, + step_keys_to_execute=self.step_keys, + run_config=self.run_config, + retries_dict={}, + ) ) - return "dagster-graphql -v '{variables}' -t '{query}'".format( - variables=seven.json.dumps(variables), query=RAW_EXECUTE_PLAN_MUTATION - ) + command = "dagster api execute_step_with_structured_logs {}".format(json.dumps(input_json)) + self.log.info("Executing: {command}\n".format(command=command)) + return command def get_docker_command(self, airflow_ts): """Deliberately renamed from get_command to avoid shadoowing the method of the base class""" @@ -268,50 +251,36 @@ self._run_id = context["dag_run"].run_id try: - if self.instance: - tags = {AIRFLOW_EXECUTION_DATE_STR: context.get("ts")} if "ts" in context else {} - - run = self.instance.register_managed_run( - pipeline_name=self.pipeline_name, - run_id=self.run_id, - run_config=self.run_config, - mode=self.mode, - solids_to_execute=None, - step_keys_to_execute=None, - tags=tags, - root_run_id=None, - parent_run_id=None, - pipeline_snapshot=self.pipeline_snapshot, - execution_plan_snapshot=self.execution_plan_snapshot, - parent_pipeline_snapshot=self.parent_pipeline_snapshot, - ) + tags = {AIRFLOW_EXECUTION_DATE_STR: context.get("ts")} if "ts" in context else {} + + self.instance.register_managed_run( + pipeline_name=self.pipeline_name, + run_id=self.run_id, + run_config=self.run_config, + mode=self.mode, + solids_to_execute=None, + step_keys_to_execute=None, + tags=tags, + root_run_id=None, + parent_run_id=None, + pipeline_snapshot=self.pipeline_snapshot, + execution_plan_snapshot=self.execution_plan_snapshot, + parent_pipeline_snapshot=self.parent_pipeline_snapshot, + ) - raw_res = self.execute_raw(context) + res = self.execute_raw(context) self.log.info("Finished executing container.") - res = parse_raw_log_lines(raw_res) + if not res: + raise AirflowException("Missing query response") try: - handle_execution_errors(res, "executePlan") - except DagsterGraphQLClientError as err: - if self.instance: - self.instance.report_engine_event( - str(err), - run, - EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - self.__class__, - ) - raise - - events = handle_execute_plan_result_raw(res) - - if self.instance: - for event in events: - self.instance.handle_new_event(event) + events = [deserialize_json_to_dagster_namedtuple(line) for line in res if line] + except Exception: # pylint: disable=broad-except + raise AirflowException( + "Could not parse response {response}".format(response=repr(res)) + ) - events = [e.dagster_event for e in events] check_events_for_failures(events) check_events_for_skips(events) diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/docker-compose.yml b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/docker-compose.yml new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/docker-compose.yml @@ -0,0 +1,20 @@ +version: "3.7" + +services: + test-postgres-db-airflow: + image: postgres:11 + container_name: test-postgres-db-airflow + ports: + - "5432:5432" + environment: + POSTGRES_PASSWORD: "test" + POSTGRES_USER: "test" + POSTGRES_DB: "test" + networks: + - postgres + + +networks: + postgres: + driver: bridge + name: postgres \ No newline at end of file diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_docker_operator.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_docker_operator.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_docker_operator.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_docker_operator.py @@ -5,11 +5,11 @@ from dagster_airflow.factory import DagsterOperatorParameters from dagster_airflow.operators.docker_operator import DagsterDockerOperator from dagster_airflow_tests.marks import requires_airflow_db -from dagster_graphql.client.mutations import DagsterGraphQLClientError from dagster import pipeline, solid from dagster.core.execution.api import create_execution_plan from dagster.core.snap import snapshot_from_execution_plan +from dagster.core.test_utils import instance_for_test @solid @@ -30,88 +30,67 @@ def test_init_modified_docker_operator(dagster_docker_image): - dagster_operator_parameters = DagsterOperatorParameters( - task_id="nonce", - run_config={"storage": {"filesystem": {}}}, - pipeline_name="", - mode="default", - op_kwargs={"image": dagster_docker_image, "api_version": "auto",}, - pipeline_snapshot=nonce_pipeline_snapshot, - execution_plan_snapshot=nonce_execution_plan_snapshot, - ) - DagsterDockerOperator(dagster_operator_parameters) + with instance_for_test() as instance: + dagster_operator_parameters = DagsterOperatorParameters( + task_id="nonce", + run_config={"storage": {"filesystem": {}}}, + pipeline_name="", + mode="default", + op_kwargs={"image": dagster_docker_image, "api_version": "auto",}, + pipeline_snapshot=nonce_pipeline_snapshot, + execution_plan_snapshot=nonce_execution_plan_snapshot, + instance_ref=instance.get_ref(), + ) + DagsterDockerOperator(dagster_operator_parameters) @requires_airflow_db def test_modified_docker_operator_bad_docker_conn(dagster_docker_image): - dagster_operator_parameters = DagsterOperatorParameters( - task_id="nonce", - run_config={"storage": {"filesystem": {}}}, - pipeline_name="", - mode="default", - op_kwargs={ - "image": dagster_docker_image, - "api_version": "auto", - "docker_conn_id": "foo_conn", - "command": "dagster-graphql --help", - }, - pipeline_snapshot=nonce_pipeline_snapshot, - execution_plan_snapshot=nonce_execution_plan_snapshot, - ) - operator = DagsterDockerOperator(dagster_operator_parameters) - - with pytest.raises(AirflowException, match="The conn_id `foo_conn` isn't defined"): - operator.execute({}) + with instance_for_test() as instance: + dagster_operator_parameters = DagsterOperatorParameters( + task_id="nonce", + run_config={"storage": {"filesystem": {}}}, + pipeline_name="", + mode="default", + op_kwargs={ + "image": dagster_docker_image, + "api_version": "auto", + "docker_conn_id": "foo_conn", + "command": "dagster-graphql --help", + }, + pipeline_snapshot=nonce_pipeline_snapshot, + execution_plan_snapshot=nonce_execution_plan_snapshot, + instance_ref=instance.get_ref(), + ) + operator = DagsterDockerOperator(dagster_operator_parameters) + + with pytest.raises(AirflowException, match="The conn_id `foo_conn` isn't defined"): + operator.execute({}) def test_modified_docker_operator_env(dagster_docker_image): - dagster_operator_parameters = DagsterOperatorParameters( - task_id="nonce", - run_config={"storage": {"filesystem": {}}}, - pipeline_name="", - mode="default", - op_kwargs={ - "image": dagster_docker_image, - "api_version": "auto", - "command": "dagster-graphql --help", - }, - pipeline_snapshot=nonce_pipeline_snapshot, - execution_plan_snapshot=nonce_execution_plan_snapshot, - ) - operator = DagsterDockerOperator(dagster_operator_parameters) - with pytest.raises(DagsterGraphQLClientError, match="Unhandled error type"): - operator.execute({}) + with instance_for_test() as instance: + dagster_operator_parameters = DagsterOperatorParameters( + task_id="nonce", + run_config={"storage": {"filesystem": {}}}, + pipeline_name="", + mode="default", + op_kwargs={ + "image": dagster_docker_image, + "api_version": "auto", + "command": "dagster-graphql --help", + }, + pipeline_snapshot=nonce_pipeline_snapshot, + execution_plan_snapshot=nonce_execution_plan_snapshot, + instance_ref=instance.get_ref(), + ) + operator = DagsterDockerOperator(dagster_operator_parameters) + with pytest.raises(AirflowException, match="Could not parse response"): + operator.execute({}) def test_modified_docker_operator_bad_command(dagster_docker_image): - dagster_operator_parameters = DagsterOperatorParameters( - task_id="nonce", - run_config={"storage": {"filesystem": {}}}, - pipeline_name="", - mode="default", - op_kwargs={ - "image": dagster_docker_image, - "api_version": "auto", - "command": "dagster-graphql gargle bargle", - }, - pipeline_snapshot=nonce_pipeline_snapshot, - execution_plan_snapshot=nonce_execution_plan_snapshot, - ) - operator = DagsterDockerOperator(dagster_operator_parameters) - with pytest.raises(AirflowException, match="'StatusCode': 2"): - operator.execute({}) - - -def test_modified_docker_operator_url(dagster_docker_image): - try: - docker_host = os.getenv("DOCKER_HOST") - docker_tls_verify = os.getenv("DOCKER_TLS_VERIFY") - docker_cert_path = os.getenv("DOCKER_CERT_PATH") - - os.environ["DOCKER_HOST"] = "gargle" - os.environ["DOCKER_TLS_VERIFY"] = "bargle" - os.environ["DOCKER_CERT_PATH"] = "farfle" - + with instance_for_test() as instance: dagster_operator_parameters = DagsterOperatorParameters( task_id="nonce", run_config={"storage": {"filesystem": {}}}, @@ -120,19 +99,50 @@ op_kwargs={ "image": dagster_docker_image, "api_version": "auto", - "docker_url": docker_host or "unix:///var/run/docker.sock", - "tls_hostname": docker_host if docker_tls_verify else False, - "tls_ca_cert": docker_cert_path, - "command": "dagster-graphql --help", + "command": "dagster-graphql gargle bargle", }, pipeline_snapshot=nonce_pipeline_snapshot, execution_plan_snapshot=nonce_execution_plan_snapshot, + instance_ref=instance.get_ref(), ) operator = DagsterDockerOperator(dagster_operator_parameters) - - with pytest.raises(DagsterGraphQLClientError, match="Unhandled error type"): + with pytest.raises(AirflowException, match="'StatusCode': 2"): operator.execute({}) + +def test_modified_docker_operator_url(dagster_docker_image): + try: + docker_host = os.getenv("DOCKER_HOST") + docker_tls_verify = os.getenv("DOCKER_TLS_VERIFY") + docker_cert_path = os.getenv("DOCKER_CERT_PATH") + + os.environ["DOCKER_HOST"] = "gargle" + os.environ["DOCKER_TLS_VERIFY"] = "bargle" + os.environ["DOCKER_CERT_PATH"] = "farfle" + + with instance_for_test() as instance: + dagster_operator_parameters = DagsterOperatorParameters( + task_id="nonce", + run_config={"storage": {"filesystem": {}}}, + pipeline_name="", + mode="default", + op_kwargs={ + "image": dagster_docker_image, + "api_version": "auto", + "docker_url": docker_host or "unix:///var/run/docker.sock", + "tls_hostname": docker_host if docker_tls_verify else False, + "tls_ca_cert": docker_cert_path, + "command": "dagster-graphql --help", + }, + pipeline_snapshot=nonce_pipeline_snapshot, + execution_plan_snapshot=nonce_execution_plan_snapshot, + instance_ref=instance.get_ref(), + ) + operator = DagsterDockerOperator(dagster_operator_parameters) + + with pytest.raises(AirflowException, match="Could not parse response"): + operator.execute({}) + finally: if docker_host is not None: os.environ["DOCKER_HOST"] = docker_host diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_custom_operator.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_custom_operator.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_custom_operator.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_custom_operator.py @@ -1,10 +1,11 @@ import logging import os -# pylint: disable=unused-import -from dagster_airflow.test_fixtures import dagster_airflow_custom_operator_pipeline from dagster_airflow_tests.marks import requires_airflow_db from dagster_airflow_tests.test_factory.utils import validate_pipeline_execution +from dagster_airflow_tests.test_fixtures import ( # pylint: disable=unused-import + dagster_airflow_custom_operator_pipeline, +) from dagster_test.dagster_airflow.custom_operator import CustomOperator from dagster_test.test_project import test_project_environments_path diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py @@ -7,12 +7,13 @@ from airflow.exceptions import AirflowException from airflow.utils import timezone from dagster_airflow.factory import make_airflow_dag_containerized_for_recon_repo -from dagster_airflow.test_fixtures import ( +from dagster_airflow_tests.conftest import dagster_docker_image +from dagster_airflow_tests.marks import nettest, requires_airflow_db +from dagster_airflow_tests.test_fixtures import ( dagster_airflow_docker_operator_pipeline, execute_tasks_in_dag, + postgres_instance, ) -from dagster_airflow_tests.conftest import dagster_docker_image -from dagster_airflow_tests.marks import nettest, requires_airflow_db from dagster_test.test_project import test_project_environments_path from dagster.core.definitions.reconstructable import ReconstructableRepository @@ -137,14 +138,21 @@ run_id = make_new_run_id() execution_date = timezone.utcnow() - dag, tasks = make_airflow_dag_containerized_for_recon_repo( - recon_repo, pipeline_name, dagster_docker_image, run_config - ) + with postgres_instance() as instance: + + dag, tasks = make_airflow_dag_containerized_for_recon_repo( + recon_repo, + pipeline_name, + dagster_docker_image, + run_config, + instance=instance, + op_kwargs={"network_mode": "container:test-postgres-db-airflow"}, + ) - with pytest.raises(AirflowException) as exc_info: - execute_tasks_in_dag(dag, tasks, run_id, execution_date) + with pytest.raises(AirflowException) as exc_info: + execute_tasks_in_dag(dag, tasks, run_id, execution_date) - assert "Exception: Unusual error" in str(exc_info.value) + assert "Exception: Unusual error" in str(exc_info.value) @requires_airflow_db @@ -163,20 +171,26 @@ execution_date = timezone.utcnow() - dag, tasks = make_airflow_dag_containerized_for_recon_repo( - recon_repo, pipeline_name, dagster_docker_image, run_config - ) - - results = execute_tasks_in_dag( - dag, tasks, run_id=make_new_run_id(), execution_date=execution_date - ) - - materialized_airflow_execution_date = None - for result in results.values(): - for event in result: - if event.event_type_value == "STEP_MATERIALIZATION": - materialization = event.event_specific_data.materialization - materialization_entry = materialization.metadata_entries[0] - materialized_airflow_execution_date = materialization_entry.entry_data.text - - assert execution_date.isoformat() == materialized_airflow_execution_date + with postgres_instance() as instance: + dag, tasks = make_airflow_dag_containerized_for_recon_repo( + recon_repo, + pipeline_name, + dagster_docker_image, + run_config, + instance=instance, + op_kwargs={"network_mode": "container:test-postgres-db-airflow"}, + ) + + results = execute_tasks_in_dag( + dag, tasks, run_id=make_new_run_id(), execution_date=execution_date + ) + + materialized_airflow_execution_date = None + for result in results.values(): + for event in result: + if event.event_type_value == "STEP_MATERIALIZATION": + materialization = event.event_specific_data.materialization + materialization_entry = materialization.metadata_entries[0] + materialized_airflow_execution_date = materialization_entry.entry_data.text + + assert execution_date.isoformat() == materialized_airflow_execution_date diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_python.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_python.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_python.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_python.py @@ -10,11 +10,11 @@ _rename_for_airflow, make_airflow_dag_for_recon_repo, ) -from dagster_airflow.test_fixtures import ( # pylint: disable=unused-import +from dagster_airflow_tests.marks import nettest, requires_airflow_db +from dagster_airflow_tests.test_fixtures import ( # pylint: disable=unused-import dagster_airflow_python_operator_pipeline, execute_tasks_in_dag, ) -from dagster_airflow_tests.marks import nettest, requires_airflow_db from dagster_test.test_project import test_project_environments_path from dagster.core.definitions.reconstructable import ReconstructableRepository diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_fixtures.py rename from python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py rename to python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_fixtures.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_fixtures.py @@ -1,5 +1,6 @@ import logging import sys +from contextlib import contextmanager import pytest from airflow import DAG @@ -8,8 +9,46 @@ from airflow.settings import LOG_FORMAT from airflow.utils import timezone +from dagster import file_relative_path, seven +from dagster.core.test_utils import instance_for_test_tempdir from dagster.core.utils import make_new_run_id -from dagster.utils import load_yaml_from_glob_list +from dagster.utils import load_yaml_from_glob_list, merge_dicts +from dagster.utils.test.postgres_instance import TestPostgresInstance + + +@contextmanager +def postgres_instance(overrides=None): + with seven.TemporaryDirectory() as temp_dir: + with TestPostgresInstance.docker_service_up_or_skip( + file_relative_path(__file__, "docker-compose.yml"), "test-postgres-db-airflow", + ) as pg_conn_string: + TestPostgresInstance.clean_run_storage(pg_conn_string) + TestPostgresInstance.clean_event_log_storage(pg_conn_string) + TestPostgresInstance.clean_schedule_storage(pg_conn_string) + with instance_for_test_tempdir( + temp_dir, + overrides=merge_dicts( + { + "run_storage": { + "module": "dagster_postgres.run_storage.run_storage", + "class": "PostgresRunStorage", + "config": {"postgres_url": pg_conn_string}, + }, + "event_log_storage": { + "module": "dagster_postgres.event_log.event_log", + "class": "PostgresEventLogStorage", + "config": {"postgres_url": pg_conn_string}, + }, + "schedule_storage": { + "module": "dagster_postgres.schedule_storage.schedule_storage", + "class": "PostgresScheduleStorage", + "config": {"postgres_url": pg_conn_string}, + }, + }, + overrides if overrides else {}, + ), + ) as instance: + yield instance def execute_tasks_in_dag(dag, tasks, run_id, execution_date): @@ -43,7 +82,7 @@ """This is a test fixture for running Dagster pipelines as Airflow DAGs. Usage: - from dagster_airflow.test_fixtures import dagster_airflow_python_operator_pipeline + from dagster_airflow_tests.test_fixtures import dagster_airflow_python_operator_pipeline def test_airflow(dagster_airflow_python_operator_pipeline): results = dagster_airflow_python_operator_pipeline( @@ -53,8 +92,8 @@ ) assert len(results) == 3 """ - from .factory import make_airflow_dag_for_recon_repo - from .vendor.python_operator import PythonOperator + from dagster_airflow.factory import make_airflow_dag_for_recon_repo + from dagster_airflow.vendor.python_operator import PythonOperator def _pipeline_fn( recon_repo, @@ -88,7 +127,7 @@ """This is a test fixture for running Dagster pipelines with custom operators as Airflow DAGs. Usage: - from dagster_airflow.test_fixtures import dagster_airflow_custom_operator_pipeline + from dagster_airflow_tests.test_fixtures import dagster_airflow_custom_operator_pipeline def test_airflow(dagster_airflow_python_operator_pipeline): results = dagster_airflow_custom_operator_pipeline( @@ -99,8 +138,8 @@ ) assert len(results) == 3 """ - from .factory import make_airflow_dag_for_operator - from .vendor.python_operator import PythonOperator + from dagster_airflow.factory import make_airflow_dag_for_operator + from dagster_airflow.vendor.python_operator import PythonOperator def _pipeline_fn( recon_repo, @@ -135,7 +174,7 @@ """This is a test fixture for running Dagster pipelines as containerized Airflow DAGs. Usage: - from dagster_airflow.test_fixtures import dagster_airflow_docker_operator_pipeline + from dagster_airflow_tests.test_fixtures import dagster_airflow_docker_operator_pipeline def test_airflow(dagster_airflow_docker_operator_pipeline): results = dagster_airflow_docker_operator_pipeline( @@ -146,8 +185,8 @@ ) assert len(results) == 3 """ - from .factory import make_airflow_dag_containerized_for_recon_repo - from .operators.docker_operator import DagsterDockerOperator + from dagster_airflow.factory import make_airflow_dag_containerized_for_recon_repo + from dagster_airflow.operators.docker_operator import DagsterDockerOperator def _pipeline_fn( recon_repo, @@ -162,22 +201,28 @@ if run_config is None and environment_yaml is not None: run_config = load_yaml_from_glob_list(environment_yaml) - dag, tasks = make_airflow_dag_containerized_for_recon_repo( - recon_repo=recon_repo, - pipeline_name=pipeline_name, - image=image, - mode=mode, - run_config=run_config, - op_kwargs=op_kwargs, - ) - assert isinstance(dag, DAG) + op_kwargs = op_kwargs or {} + op_kwargs["network_mode"] = "container:test-postgres-db-airflow" + + with postgres_instance() as instance: + + dag, tasks = make_airflow_dag_containerized_for_recon_repo( + recon_repo=recon_repo, + pipeline_name=pipeline_name, + image=image, + mode=mode, + run_config=run_config, + op_kwargs=op_kwargs, + instance=instance, + ) + assert isinstance(dag, DAG) - for task in tasks: - assert isinstance(task, DagsterDockerOperator) + for task in tasks: + assert isinstance(task, DagsterDockerOperator) - return execute_tasks_in_dag( - dag, tasks, run_id=make_new_run_id(), execution_date=execution_date - ) + return execute_tasks_in_dag( + dag, tasks, run_id=make_new_run_id(), execution_date=execution_date + ) return _pipeline_fn @@ -187,7 +232,7 @@ """This is a test fixture for running Dagster pipelines on Airflow + K8s. Usage: - from dagster_airflow.test_fixtures import dagster_airflow_k8s_operator_pipeline + from dagster_airflow_tests.test_fixtures import dagster_airflow_k8s_operator_pipeline def test_airflow(dagster_airflow_k8s_operator_pipeline): results = dagster_airflow_k8s_operator_pipeline( @@ -198,8 +243,8 @@ ) assert len(results) == 3 """ - from .factory import make_airflow_dag_kubernetized_for_recon_repo - from .operators.kubernetes_operator import DagsterKubernetesPodOperator + from dagster_airflow.factory import make_airflow_dag_kubernetized_for_recon_repo + from dagster_airflow.operators.kubernetes_operator import DagsterKubernetesPodOperator def _pipeline_fn( recon_repo, diff --git a/python_modules/libraries/dagster-airflow/tox.ini b/python_modules/libraries/dagster-airflow/tox.ini --- a/python_modules/libraries/dagster-airflow/tox.ini +++ b/python_modules/libraries/dagster-airflow/tox.ini @@ -2,7 +2,7 @@ envlist = py{38,37,36,27}-{unix,windows}-{default,requiresairflowdb},pylint [testenv] -passenv = HOME AIRFLOW_HOME AWS_* BUILDKITE CI_* COVERALLS_REPO_TOKEN DAGSTER_* DOCKER_* GOOGLE_* KUBECONFIG +passenv = HOME AIRFLOW_HOME AWS_* BUILDKITE CI_* COVERALLS_REPO_TOKEN DAGSTER_* DOCKER_* GOOGLE_* KUBECONFIG, POSTGRES_TEST_DB_HOST setenv = SLUGIFY_USES_TEXT_UNIDECODE = yes VIRTUALENV_NO_DOWNLOAD = yes @@ -27,8 +27,8 @@ !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' requiresairflowdb: airflow initdb echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" - !requiresairflowdb: pytest -m "not requires_airflow_db" -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= - requiresairflowdb: pytest -m requires_airflow_db -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= + !requiresairflowdb: pytest -m "not requires_airflow_db" -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= {posargs} + requiresairflowdb: pytest -m requires_airflow_db -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= {posargs} coverage report --omit='.tox/*,**/test_*.py' --skip-covered coverage html --omit='.tox/*,**/test_*.py' coverage xml --omit='.tox/*,**/test_*.py' diff --git a/python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/test_execute_docker.py b/python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/test_execute_docker.py --- a/python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/test_execute_docker.py +++ b/python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/test_execute_docker.py @@ -97,30 +97,30 @@ except docker.errors.ImageNotFound: build_and_tag_test_image(docker_image) - run_config = merge_dicts( - merge_yamls( - [ - os.path.join(test_project_environments_path(), "env.yaml"), - os.path.join(test_project_environments_path(), "env_s3.yaml"), - ] - ), - { - "execution": { - "celery-docker": { - "config": { - "docker": docker_config, - "config_source": {"task_always_eager": True}, - } + run_config = merge_dicts( + merge_yamls( + [ + os.path.join(test_project_environments_path(), "env.yaml"), + os.path.join(test_project_environments_path(), "env_s3.yaml"), + ] + ), + { + "execution": { + "celery-docker": { + "config": { + "docker": docker_config, + "config_source": {"task_always_eager": True}, } - }, + } }, - ) + }, + ) - with postgres_instance() as instance: + with postgres_instance() as instance: - result = execute_pipeline( - get_test_project_recon_pipeline("docker_celery_pipeline"), - run_config=run_config, - instance=instance, - ) - assert result.success + result = execute_pipeline( + get_test_project_recon_pipeline("docker_celery_pipeline"), + run_config=run_config, + instance=instance, + ) + assert result.success diff --git a/python_modules/libraries/dagster-celery-docker/tox.ini b/python_modules/libraries/dagster-celery-docker/tox.ini --- a/python_modules/libraries/dagster-celery-docker/tox.ini +++ b/python_modules/libraries/dagster-celery-docker/tox.ini @@ -2,7 +2,7 @@ envlist = py{38,37,36,27}-{unix,windows},pylint [testenv] -passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID DAGSTER_DOCKER_* +passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID DAGSTER_DOCKER_* POSTGRES_TEST_DB_HOST deps = -e ../../dagster -e ../../dagster-graphql