diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/docker-compose.yml b/python_modules/libraries/dagster-airflow/dagster_airflow/docker-compose.yml new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/docker-compose.yml @@ -0,0 +1,20 @@ +version: "3.7" + +services: + test-postgres-db-airflow: + image: postgres:11 + container_name: test-postgres-db-airflow + ports: + - "5432:5432" + environment: + POSTGRES_PASSWORD: "test" + POSTGRES_USER: "test" + POSTGRES_DB: "test" + networks: + - postgres + + +networks: + postgres: + driver: bridge + name: postgres \ No newline at end of file diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py @@ -459,6 +459,7 @@ dag_description=None, dag_kwargs=None, op_kwargs=None, + instance=None, ): check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) check.str_param(pipeline_name, "pipeline_name") @@ -481,6 +482,7 @@ dag_kwargs=dag_kwargs, op_kwargs=op_kwargs, operator=DagsterDockerOperator, + instance=instance, ) diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py b/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py @@ -1,30 +1,18 @@ import ast -import sys +import json import warnings from contextlib import contextmanager from airflow.exceptions import AirflowException from dagster_airflow.vendor.docker_operator import DockerOperator -from dagster_graphql.client.mutations import ( - DagsterGraphQLClientError, - handle_execute_plan_result_raw, - handle_execution_errors, -) -from dagster_graphql.client.query import RAW_EXECUTE_PLAN_MUTATION -from dagster_graphql.client.util import construct_execute_plan_variables, parse_raw_log_lines from docker import APIClient, from_env from dagster import check, seven -from dagster.core.events import EngineEventData from dagster.core.instance import AIRFLOW_EXECUTION_DATE_STR, DagsterInstance -from dagster.utils.error import serializable_error_info_from_exc_info +from dagster.grpc.types import ExecuteStepArgs +from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple -from .util import ( - airflow_tags_for_ts, - check_events_for_failures, - check_events_for_skips, - get_aws_environment, -) +from .util import check_events_for_failures, check_events_for_skips, get_aws_environment DOCKER_TEMPDIR = "/tmp" @@ -100,13 +88,10 @@ self.step_keys = dagster_operator_parameters.step_keys self.recon_repo = dagster_operator_parameters.recon_repo self._run_id = None - # self.instance might be None in, for instance, a unit test setting where the operator - # was being directly instantiated without passing through make_airflow_dag - self.instance = ( - DagsterInstance.from_ref(dagster_operator_parameters.instance_ref) - if dagster_operator_parameters.instance_ref - else None - ) + + self.instance_ref = dagster_operator_parameters.instance_ref + check.invariant(self.instance_ref) + self.instance = DagsterInstance.from_ref(self.instance_ref) # These shenanigans are so we can override DockerOperator.get_hook in order to configure # a docker client using docker.from_env, rather than messing with the logic of @@ -181,7 +166,9 @@ res = [] line = "" - for new_line in self.cli.logs(container=self.container["Id"], stream=True): + for new_line in self.cli.logs( + container=self.container["Id"], stream=True, stdout=True, stderr=False + ): line = new_line.strip() if hasattr(line, "decode"): line = line.decode("utf-8") @@ -217,27 +204,23 @@ def query(self, airflow_ts): check.opt_str_param(airflow_ts, "airflow_ts") - variables = construct_execute_plan_variables( - self.recon_repo, - self.mode, - self.run_config, - self.pipeline_name, - self.run_id, - self.step_keys, - ) - - tags = airflow_tags_for_ts(airflow_ts) - variables["executionParams"]["executionMetadata"]["tags"] = tags - - self.log.info( - "Executing GraphQL query: {query}\n".format(query=RAW_EXECUTE_PLAN_MUTATION) - + "with variables:\n" - + seven.json.dumps(variables, indent=2) + recon_pipeline = self.recon_repo.get_reconstructable_pipeline(self.pipeline_name) + + input_json = serialize_dagster_namedtuple( + ExecuteStepArgs( + pipeline_origin=recon_pipeline.get_origin(), + pipeline_run_id=self.run_id, + instance_ref=self.instance_ref, + mode=self.mode, + step_keys_to_execute=self.step_keys, + run_config=self.run_config, + retries_dict={}, + ) ) - return "dagster-graphql -v '{variables}' -t '{query}'".format( - variables=seven.json.dumps(variables), query=RAW_EXECUTE_PLAN_MUTATION - ) + command = "dagster api execute_step_with_structured_logs {}".format(json.dumps(input_json)) + self.log.info("Executing: {command}\n".format(command=command)) + return command def get_docker_command(self, airflow_ts): """Deliberately renamed from get_command to avoid shadoowing the method of the base class""" @@ -268,50 +251,28 @@ self._run_id = context["dag_run"].run_id try: - if self.instance: - tags = {AIRFLOW_EXECUTION_DATE_STR: context.get("ts")} if "ts" in context else {} - - run = self.instance.register_managed_run( - pipeline_name=self.pipeline_name, - run_id=self.run_id, - run_config=self.run_config, - mode=self.mode, - solids_to_execute=None, - step_keys_to_execute=None, - tags=tags, - root_run_id=None, - parent_run_id=None, - pipeline_snapshot=self.pipeline_snapshot, - execution_plan_snapshot=self.execution_plan_snapshot, - parent_pipeline_snapshot=self.parent_pipeline_snapshot, - ) + tags = {AIRFLOW_EXECUTION_DATE_STR: context.get("ts")} if "ts" in context else {} + + self.instance.register_managed_run( + pipeline_name=self.pipeline_name, + run_id=self.run_id, + run_config=self.run_config, + mode=self.mode, + solids_to_execute=None, + step_keys_to_execute=None, + tags=tags, + root_run_id=None, + parent_run_id=None, + pipeline_snapshot=self.pipeline_snapshot, + execution_plan_snapshot=self.execution_plan_snapshot, + parent_pipeline_snapshot=self.parent_pipeline_snapshot, + ) - raw_res = self.execute_raw(context) + res = self.execute_raw(context) self.log.info("Finished executing container.") - res = parse_raw_log_lines(raw_res) - - try: - handle_execution_errors(res, "executePlan") - except DagsterGraphQLClientError as err: - if self.instance: - self.instance.report_engine_event( - str(err), - run, - EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - self.__class__, - ) - raise - - events = handle_execute_plan_result_raw(res) - - if self.instance: - for event in events: - self.instance.handle_new_event(event) + events = [deserialize_json_to_dagster_namedtuple(line) for line in res if line] - events = [e.dagster_event for e in events] check_events_for_failures(events) check_events_for_skips(events) diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py b/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/test_fixtures.py @@ -1,5 +1,6 @@ import logging import sys +from contextlib import contextmanager import pytest from airflow import DAG @@ -8,8 +9,46 @@ from airflow.settings import LOG_FORMAT from airflow.utils import timezone +from dagster import file_relative_path, seven +from dagster.core.test_utils import instance_for_test_tempdir from dagster.core.utils import make_new_run_id -from dagster.utils import load_yaml_from_glob_list +from dagster.utils import load_yaml_from_glob_list, merge_dicts +from dagster.utils.test.postgres_instance import TestPostgresInstance + + +@contextmanager +def postgres_instance(overrides=None): + with seven.TemporaryDirectory() as temp_dir: + with TestPostgresInstance.docker_service_up_or_skip( + file_relative_path(__file__, "docker-compose.yml"), "test-postgres-db-airflow", + ) as pg_conn_string: + TestPostgresInstance.clean_run_storage(pg_conn_string) + TestPostgresInstance.clean_event_log_storage(pg_conn_string) + TestPostgresInstance.clean_schedule_storage(pg_conn_string) + with instance_for_test_tempdir( + temp_dir, + overrides=merge_dicts( + { + "run_storage": { + "module": "dagster_postgres.run_storage.run_storage", + "class": "PostgresRunStorage", + "config": {"postgres_url": pg_conn_string}, + }, + "event_log_storage": { + "module": "dagster_postgres.event_log.event_log", + "class": "PostgresEventLogStorage", + "config": {"postgres_url": pg_conn_string}, + }, + "schedule_storage": { + "module": "dagster_postgres.schedule_storage.schedule_storage", + "class": "PostgresScheduleStorage", + "config": {"postgres_url": pg_conn_string}, + }, + }, + overrides if overrides else {}, + ), + ) as instance: + yield instance def execute_tasks_in_dag(dag, tasks, run_id, execution_date): @@ -162,22 +201,25 @@ if run_config is None and environment_yaml is not None: run_config = load_yaml_from_glob_list(environment_yaml) - dag, tasks = make_airflow_dag_containerized_for_recon_repo( - recon_repo=recon_repo, - pipeline_name=pipeline_name, - image=image, - mode=mode, - run_config=run_config, - op_kwargs=op_kwargs, - ) - assert isinstance(dag, DAG) + with postgres_instance() as instance: - for task in tasks: - assert isinstance(task, DagsterDockerOperator) + dag, tasks = make_airflow_dag_containerized_for_recon_repo( + recon_repo=recon_repo, + pipeline_name=pipeline_name, + image=image, + mode=mode, + run_config=run_config, + op_kwargs=op_kwargs, + instance=instance, + ) + assert isinstance(dag, DAG) - return execute_tasks_in_dag( - dag, tasks, run_id=make_new_run_id(), execution_date=execution_date - ) + for task in tasks: + assert isinstance(task, DagsterDockerOperator) + + return execute_tasks_in_dag( + dag, tasks, run_id=make_new_run_id(), execution_date=execution_date + ) return _pipeline_fn diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_factory/test_docker.py @@ -38,6 +38,7 @@ os.path.join(environments_path, "env_filesystem_no_explicit_base_dir.yaml"), ], image=dagster_docker_image, + op_kwargs={"network_mode": "container:test-postgres-db-airflow"}, ) validate_pipeline_execution(results) diff --git a/python_modules/libraries/dagster-airflow/tox.ini b/python_modules/libraries/dagster-airflow/tox.ini --- a/python_modules/libraries/dagster-airflow/tox.ini +++ b/python_modules/libraries/dagster-airflow/tox.ini @@ -27,8 +27,8 @@ !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' requiresairflowdb: airflow initdb echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" - !requiresairflowdb: pytest -m "not requires_airflow_db" -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= - requiresairflowdb: pytest -m requires_airflow_db -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= + !requiresairflowdb: pytest -m "not requires_airflow_db" -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= {posargs} + requiresairflowdb: pytest -m requires_airflow_db -s -vv --junitxml=test_results.xml --cov=dagster_airflow --cov-append --cov-report= {posargs} coverage report --omit='.tox/*,**/test_*.py' --skip-covered coverage html --omit='.tox/*,**/test_*.py' coverage xml --omit='.tox/*,**/test_*.py'