diff --git a/examples/dagster_examples/jaffle_dbt/jaffle.py b/examples/dagster_examples/jaffle_dbt/jaffle.py index 1557db5f0..50ed61cb1 100644 --- a/examples/dagster_examples/jaffle_dbt/jaffle.py +++ b/examples/dagster_examples/jaffle_dbt/jaffle.py @@ -1,10 +1,16 @@ +''' +''' from dagster import file_relative_path, pipeline -from dagster_dbt import create_dbt_solid +from dagster_dbt import create_dbt_solid, create_dbt_test_solid -jaffle_solid = create_dbt_solid(file_relative_path(__file__, 'jaffle_shop')) +PROJECT_DIR = file_relative_path(__file__, 'jaffle_shop') +PROFILES_DIR = file_relative_path(__file__, 'profiles') + +jaffle_solid = create_dbt_solid(PROJECT_DIR, profiles_dir=PROFILES_DIR) +jaffle_test_solid = create_dbt_test_solid(PROJECT_DIR, profiles_dir=PROFILES_DIR) @pipeline def jaffle_pipeline(): - jaffle_solid() # pylint: disable=no-value-for-parameter + jaffle_test_solid(jaffle_solid()) # pylint: disable=no-value-for-parameter diff --git a/examples/dagster_examples/jaffle_dbt/jaffle_shop/models/marts/core/schema.yml b/examples/dagster_examples/jaffle_dbt/jaffle_shop/models/marts/core/schema.yml index c85fbf362..52779b296 100644 --- a/examples/dagster_examples/jaffle_dbt/jaffle_shop/models/marts/core/schema.yml +++ b/examples/dagster_examples/jaffle_dbt/jaffle_shop/models/marts/core/schema.yml @@ -1,85 +1,88 @@ version: 2 models: - name: dim_customers description: This table has basic information about a customer, as well as some derived facts based on a customer's orders columns: - name: customer_id description: This is a unique identifier for a customer tests: - unique - not_null - name: first_name description: Customer's first name. PII. - name: last_name description: Customer's last name. PII. - name: email description: Customer's email address. PII. - name: first_order description: Date (UTC) of a customer's first order - name: most_recent_order description: Date (UTC) of a customer's most recent order - name: number_of_orders description: Count of the number of orders a customer has placed - name: total_order_amount description: Total value (AUD) of a customer's orders - name: fct_orders description: This table has basic information about orders, as well as some derived facts based on payments columns: - name: order_id tests: - unique - not_null description: This is a unique identifier for an order - name: customer_id description: Foreign key to the customers table tests: - not_null - relationships: to: ref('dim_customers') field: customer_id - name: order_date description: Date (UTC) that the order was placed - name: status description: '{{ doc("orders_status") }}' tests: - accepted_values: - values: ['placed', 'shipped', 'completed', 'return_pending', 'returned'] + values: + ["placed", "shipped", "completed", "return_pending", "returned"] + # Keeping here to inject failures + # values: ["jdkfjkd", "ddddd"] - name: amount description: Total amount (AUD) of the order tests: - not_null - name: credit_card_amount description: Amount of the order (AUD) paid for by credit card tests: - not_null - name: coupon_amount description: Amount of the order (AUD) paid for by coupon tests: - not_null - name: bank_transfer_amount description: Amount of the order (AUD) paid for by bank transfer tests: - not_null - name: gift_card_amount description: Amount of the order (AUD) paid for by gift card tests: - not_null diff --git a/examples/dagster_examples/jaffle_dbt/profiles/.gitignore b/examples/dagster_examples/jaffle_dbt/profiles/.gitignore new file mode 100644 index 000000000..f790ff10d --- /dev/null +++ b/examples/dagster_examples/jaffle_dbt/profiles/.gitignore @@ -0,0 +1 @@ +.user.yml \ No newline at end of file diff --git a/examples/dagster_examples/jaffle_dbt/profiles/profiles.yml b/examples/dagster_examples/jaffle_dbt/profiles/profiles.yml new file mode 100644 index 000000000..5a286bf90 --- /dev/null +++ b/examples/dagster_examples/jaffle_dbt/profiles/profiles.yml @@ -0,0 +1,18 @@ +# For more information on how to configure this file, please see: +# https://github.com/fishtown-analytics/dbt/blob/master/sample.profiles.yml + +# Checking in for trivial test. +# Normally this should not be checked in and lives in ~/.dbt/profiles by default. + +jaffle_shop: + target: dev + outputs: + dev: + type: postgres + host: localhost + user: test + pass: test + port: 5432 + dbname: jaffle_shop + schema: dbt_alice + threads: 4 diff --git a/examples/setup.py b/examples/setup.py index e36a2604b..93e962060 100644 --- a/examples/setup.py +++ b/examples/setup.py @@ -1,52 +1,53 @@ from setuptools import find_packages, setup setup( name='dagster_examples', version='dev', author='Elementl', license='Apache-2.0', description='Dagster Examples', url='https://github.com/dagster-io/dagster', classifiers=[ 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'License :: OSI Approved :: Apache Software License', 'Operating System :: OS Independent', ], packages=find_packages(exclude=['test']), # default supports basic tutorial & toy examples install_requires=['dagster'], extras_require={ # full is for running the more realistic demos 'full': [ 'dagstermill', 'dagster-aws', 'dagster-slack', + 'dbt-postgres', 'descartes==1.1.0', 'geopandas==0.4.0', 'matplotlib==3.0.2; python_version >= "3.5"', 'matplotlib==2.2.4; python_version < "3.5"', 'mock==2.0.0', # pyproj is required by geopandas, but something is wrong with the # wheel for 2.0.2 'pyproj==2.0.1', 'pyspark==2.4.0', # You can dig into why this is is necessary by digging into some of # insanity in this github issue. https://github.com/psycopg/psycopg2/issues/674 # Essentially we are ensuring here that a version of psycopg is installed # that has the binaries installed (they are removed in psycopg 2.8) # We would update the dependencies ourselves but this is actually dependent # on dependency management of sqlalchemy-redshift or one of its transitive # dependencies. They try to install a version of psycopg2 that does # not include the binaries and this whole thing breaks. # For now we are pinning to a version that we know works. This is probably # not flexible enough, but we will resolve that issue when we run into it. 'psycopg2==2.7.6.1', 'sqlalchemy-redshift>=0.7.2', 'SQLAlchemy-Utils==0.33.8', ], 'airflow': ['dagster_airflow', 'docker-compose==1.23.2'], }, ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py index b744835ce..0f36db726 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_expectations.py @@ -1,109 +1,110 @@ # -*- coding: utf-8 -*- # snapshottest: v1 - https://goo.gl/zC4yUc from __future__ import unicode_literals from snapshottest import Snapshot + snapshots = Snapshot() snapshots['test_basic_expectations_within_compute_step_events 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { 'description': 'Failure', 'label': 'always_false', 'metadataEntries': [ { 'description': None, 'jsonString': '{"reason": "Relentless pessimism."}', 'label': 'data' } ], 'success': False }, 'level': 'DEBUG', - 'message': 'Expectation always_false failed', + 'message': 'Failure', 'step': { 'key': 'emit_failed_expectation.compute', 'solidHandleID': 'emit_failed_expectation' } } ] snapshots['test_basic_expectations_within_compute_step_events 2'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { 'description': 'Successful', 'label': 'always_true', 'metadataEntries': [ { 'description': None, 'jsonString': '{"reason": "Just because."}', 'label': 'data' } ], 'success': True }, 'level': 'DEBUG', - 'message': 'Expectation always_true passed', + 'message': 'Successful', 'step': { 'key': 'emit_successful_expectation.compute', 'solidHandleID': 'emit_successful_expectation' } } ] snapshots['test_basic_expectations_within_compute_step_events 3'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { 'description': 'Successful', 'label': 'no_metadata', 'metadataEntries': [ ], 'success': True }, 'level': 'DEBUG', - 'message': 'Expectation no_metadata passed', + 'message': 'Successful', 'step': { 'key': 'emit_successful_expectation_no_metadata.compute', 'solidHandleID': 'emit_successful_expectation_no_metadata' } } ] snapshots['test_basic_input_output_expectations 1'] = [ { '__typename': 'StepExpectationResultEvent', 'expectationResult': { 'description': None, 'label': 'some_expectation', 'metadataEntries': [ ], 'success': True }, 'level': 'DEBUG', 'message': 'Expectation some_expectation passed', 'step': { 'key': 'df_expectations_solid.compute', 'solidHandleID': 'df_expectations_solid' } }, { '__typename': 'StepExpectationResultEvent', 'expectationResult': { 'description': None, 'label': 'other_expecation', 'metadataEntries': [ ], 'success': True }, 'level': 'DEBUG', 'message': 'Expectation other_expecation passed', 'step': { 'key': 'df_expectations_solid.compute', 'solidHandleID': 'df_expectations_solid' } } ] diff --git a/python_modules/dagster/dagster/core/events/__init__.py b/python_modules/dagster/dagster/core/events/__init__.py index 4df784a80..778e71b96 100644 --- a/python_modules/dagster/dagster/core/events/__init__.py +++ b/python_modules/dagster/dagster/core/events/__init__.py @@ -1,541 +1,546 @@ '''Structured representations of system events.''' from collections import namedtuple from enum import Enum from dagster import check from dagster.core.definitions import ( EventMetadataEntry, ExpectationResult, Materialization, SolidHandle, TypeCheck, ) from dagster.core.definitions.events import ObjectStoreOperationType from dagster.core.execution.plan.objects import StepOutputData from dagster.core.log_manager import DagsterLogManager from dagster.utils.error import SerializableErrorInfo from dagster.utils.timing import format_duration class DagsterEventType(Enum): STEP_OUTPUT = 'STEP_OUTPUT' STEP_INPUT = 'STEP_INPUT' STEP_FAILURE = 'STEP_FAILURE' STEP_START = 'STEP_START' STEP_SUCCESS = 'STEP_SUCCESS' STEP_SKIPPED = 'STEP_SKIPPED' STEP_MATERIALIZATION = 'STEP_MATERIALIZATION' STEP_EXPECTATION_RESULT = 'STEP_EXPECTATION_RESULT' PIPELINE_INIT_FAILURE = 'PIPELINE_INIT_FAILURE' PIPELINE_START = 'PIPELINE_START' PIPELINE_SUCCESS = 'PIPELINE_SUCCESS' PIPELINE_FAILURE = 'PIPELINE_FAILURE' PIPELINE_PROCESS_START = 'PIPELINE_PROCESS_START' PIPELINE_PROCESS_STARTED = 'PIPELINE_PROCESS_STARTED' OBJECT_STORE_OPERATION = 'OBJECT_STORE_OPERATION' STEP_EVENTS = { DagsterEventType.STEP_INPUT, DagsterEventType.STEP_START, DagsterEventType.STEP_OUTPUT, DagsterEventType.STEP_FAILURE, DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SKIPPED, DagsterEventType.STEP_MATERIALIZATION, DagsterEventType.STEP_EXPECTATION_RESULT, DagsterEventType.OBJECT_STORE_OPERATION, } FAILURE_EVENTS = { DagsterEventType.PIPELINE_INIT_FAILURE, DagsterEventType.PIPELINE_FAILURE, DagsterEventType.STEP_FAILURE, } def _assert_type(method, expected_type, actual_type): check.invariant( expected_type == actual_type, ( '{method} only callable when event_type is {expected_type}, called on {actual_type}' ).format(method=method, expected_type=expected_type, actual_type=actual_type), ) def _validate_event_specific_data(event_type, event_specific_data): from dagster.core.execution.plan.objects import StepFailureData, StepSuccessData, StepInputData if event_type == DagsterEventType.STEP_OUTPUT: check.inst_param(event_specific_data, 'event_specific_data', StepOutputData) elif event_type == DagsterEventType.STEP_FAILURE: check.inst_param(event_specific_data, 'event_specific_data', StepFailureData) elif event_type == DagsterEventType.STEP_SUCCESS: check.inst_param(event_specific_data, 'event_specific_data', StepSuccessData) elif event_type == DagsterEventType.STEP_MATERIALIZATION: check.inst_param(event_specific_data, 'event_specific_data', StepMaterializationData) elif event_type == DagsterEventType.PIPELINE_PROCESS_STARTED: check.inst_param(event_specific_data, 'event_specific_data', PipelineProcessStartedData) elif event_type == DagsterEventType.STEP_INPUT: check.inst_param(event_specific_data, 'event_specific_data', StepInputData) return event_specific_data def log_step_event(step_context, event): event_type = DagsterEventType(event.event_type_value) log_fn = step_context.log.error if event_type in FAILURE_EVENTS else step_context.log.debug log_fn( event.message or '{event_type} for step {step_key}'.format( event_type=event_type, step_key=step_context.step.key ), dagster_event=event, pipeline_name=step_context.pipeline_def.name, ) def log_pipeline_event(pipeline_context, event): event_type = DagsterEventType(event.event_type_value) log_fn = ( pipeline_context.log.error if event_type in FAILURE_EVENTS else pipeline_context.log.debug ) log_fn( event.message or '{event_type} for pipeline {pipeline_name}'.format( event_type=event_type, pipeline_name=pipeline_context.pipeline_def.name ), dagster_event=event, pipeline_name=pipeline_context.pipeline_def.name, ) class DagsterEvent( namedtuple( '_DagsterEvent', 'event_type_value pipeline_name step_key solid_handle step_kind_value ' 'logging_tags event_specific_data message', ) ): @staticmethod def from_step(event_type, step_context, event_specific_data=None, message=None): from dagster.core.execution.context.system import SystemStepExecutionContext check.inst_param(step_context, 'step_context', SystemStepExecutionContext) event = DagsterEvent( check.inst_param(event_type, 'event_type', DagsterEventType).value, step_context.pipeline_def.name, step_context.step.key, step_context.step.solid_handle, step_context.step.kind.value, step_context.logging_tags, _validate_event_specific_data(event_type, event_specific_data), check.opt_str_param(message, 'message'), ) log_step_event(step_context, event) return event @staticmethod def from_pipeline(event_type, pipeline_context, message=None): from dagster.core.execution.context.system import SystemPipelineExecutionContext check.inst_param(pipeline_context, 'pipeline_context', SystemPipelineExecutionContext) pipeline_name = pipeline_context.pipeline_def.name event = DagsterEvent( check.inst_param(event_type, 'event_type', DagsterEventType).value, check.str_param(pipeline_name, 'pipeline_name'), message=check.opt_str_param(message, 'message'), ) log_pipeline_event(pipeline_context, event) return event def __new__( cls, event_type_value, pipeline_name, step_key=None, solid_handle=None, step_kind_value=None, logging_tags=None, event_specific_data=None, message=None, ): return super(DagsterEvent, cls).__new__( cls, check.str_param(event_type_value, 'event_type_value'), check.str_param(pipeline_name, 'pipeline_name'), check.opt_str_param(step_key, 'step_key'), check.opt_inst_param(solid_handle, 'solid_handle', SolidHandle), check.opt_str_param(step_kind_value, 'step_kind_value'), check.opt_dict_param(logging_tags, 'logging_tags'), _validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data), check.opt_str_param(message, 'message'), ) @property def solid_name(self): return self.solid_handle.name @property def solid_definition_name(self): return self.solid_handle.definition_name @property def event_type(self): return DagsterEventType(self.event_type_value) @property def is_step_event(self): return self.event_type in STEP_EVENTS @property def step_kind(self): from dagster.core.execution.plan.objects import StepKind return StepKind(self.step_kind_value) @property def is_step_success(self): return self.event_type == DagsterEventType.STEP_SUCCESS @property def is_successful_output(self): return self.event_type == DagsterEventType.STEP_OUTPUT @property def is_step_failure(self): return self.event_type == DagsterEventType.STEP_FAILURE @property def is_failure(self): return self.event_type in FAILURE_EVENTS @property def step_output_data(self): _assert_type('step_output_data', DagsterEventType.STEP_OUTPUT, self.event_type) return self.event_specific_data @property def step_success_data(self): _assert_type('step_success_data', DagsterEventType.STEP_SUCCESS, self.event_type) return self.event_specific_data @property def step_failure_data(self): _assert_type('step_failure_data', DagsterEventType.STEP_FAILURE, self.event_type) return self.event_specific_data @property def pipeline_process_started_data(self): _assert_type( 'pipeline_process_started', DagsterEventType.PIPELINE_PROCESS_STARTED, self.event_type ) return self.event_specific_data @property def pipeline_process_start_data(self): _assert_type( 'pipeline_process_start', DagsterEventType.PIPELINE_PROCESS_START, self.event_type ) return self.event_specific_data @property def step_materialization_data(self): _assert_type( 'step_materialization_data', DagsterEventType.STEP_MATERIALIZATION, self.event_type ) return self.event_specific_data @property def pipeline_init_failure_data(self): _assert_type( 'pipeline_init_failure_data', DagsterEventType.PIPELINE_INIT_FAILURE, self.event_type ) return self.event_specific_data @staticmethod def step_output_event(step_context, step_output_data): check.inst_param(step_output_data, 'step_output_data', StepOutputData) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_OUTPUT, step_context=step_context, event_specific_data=step_output_data, message='Yielded output "{output_name}" of type "{output_type}".{type_check_clause}'.format( output_name=step_output_data.step_output_handle.output_name, output_type=step_context.step.step_output_named( step_output_data.step_output_handle.output_name ).runtime_type.name, type_check_clause=( ' Warning! Type check failed.' if not step_output_data.type_check_data.success else ' (Type check passed).' ) if step_output_data.type_check_data else ' (No type check).', ), ) @staticmethod def step_failure_event(step_context, step_failure_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_FAILURE, step_context=step_context, event_specific_data=step_failure_data, message='Execution of step "{step_key}" failed.'.format(step_key=step_context.step.key), ) @staticmethod def step_input_event(step_context, step_input_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_INPUT, step_context=step_context, event_specific_data=step_input_data, message='Got input "{input_name}" of type "{input_type}".{type_check_clause}'.format( input_name=step_input_data.input_name, input_type=step_context.step.step_input_named( step_input_data.input_name ).runtime_type.name, type_check_clause=( ' Warning! Type check failed.' if not step_input_data.type_check_data.success else ' (Type check passed).' ) if step_input_data.type_check_data else ' (No type check).', ), ) @staticmethod def step_start_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_START, step_context=step_context, message='Started execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def step_success_event(step_context, success): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SUCCESS, step_context=step_context, event_specific_data=success, message='Finished execution of step "{step_key}" in {duration}.'.format( # TODO: Make duration human readable # See: https://github.com/dagster-io/dagster/issues/1602 step_key=step_context.step.key, duration=format_duration(success.duration_ms), ), ) @staticmethod def step_skipped_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SKIPPED, step_context=step_context, message='Skipped execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def step_materialization(step_context, materialization): check.inst_param(materialization, 'materialization', Materialization) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_MATERIALIZATION, step_context=step_context, event_specific_data=StepMaterializationData(materialization), message=materialization.description if materialization.description else 'Materialized value{label_clause}.'.format( label_clause=' {label}'.format(label=materialization.label) if materialization.label else '' ), ) @staticmethod def step_expectation_result(step_context, expectation_result): check.inst_param(expectation_result, 'expectation_result', ExpectationResult) + + def _msg(): + if expectation_result.description: + return expectation_result.description + + return 'Expectation{label_clause} {result_verb}'.format( + label_clause=' ' + expectation_result.label if expectation_result.label else '', + result_verb='passed' if expectation_result.success else 'failed', + ) + return DagsterEvent.from_step( event_type=DagsterEventType.STEP_EXPECTATION_RESULT, step_context=step_context, event_specific_data=StepExpectationResultData(expectation_result), - message='Expectation{label_clause} {result_verb}'.format( - label_clause=' {label}'.format(label=expectation_result.label) - if expectation_result.label - else '', - result_verb='passed' if expectation_result.success else 'failed', - ), + message=_msg(), ) @staticmethod def pipeline_start(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_START, pipeline_context, message='Started execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_def.name ), ) @staticmethod def pipeline_success(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_SUCCESS, pipeline_context, message='Finished execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_def.name ), ) @staticmethod def pipeline_failure(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_FAILURE, pipeline_context, message='Execution of pipeline "{pipeline_name}" failed.'.format( pipeline_name=pipeline_context.pipeline_def.name ), ) @staticmethod def pipeline_init_failure(pipeline_name, failure_data, log_manager): check.inst_param(failure_data, 'failure_data', PipelineInitFailureData) check.inst_param(log_manager, 'log_manager', DagsterLogManager) # this failure happens trying to bring up context so can't use from_pipeline event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_INIT_FAILURE.value, pipeline_name=pipeline_name, event_specific_data=failure_data, message=( 'Pipeline failure during initialization of pipeline "{pipeline_name}. ' 'This may be due to a failure in initializing a resource or logger".' ).format(pipeline_name=pipeline_name), ) log_manager.error( event.message or '{event_type} for pipeline {pipeline_name}'.format( event_type=DagsterEventType.PIPELINE_INIT_FAILURE, pipeline_name=pipeline_name ), dagster_event=event, pipeline_name=pipeline_name, ) return event @staticmethod def object_store_operation(step_context, object_store_operation_result): object_store_name = ( '{object_store_name} '.format( object_store_name=object_store_operation_result.object_store_name ) if object_store_operation_result.object_store_name else '' ) serialization_strategy_modifier = ( ' using {serialization_strategy_name}'.format( serialization_strategy_name=object_store_operation_result.serialization_strategy_name ) if object_store_operation_result.serialization_strategy_name else '' ) if ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.SET_OBJECT ): message = ( 'Stored intermediate object for output {value_name} in ' '{object_store_name}object store{serialization_strategy_modifier}.' ).format( value_name=object_store_operation_result.value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.GET_OBJECT ): message = ( 'Retrieved intermediate object for input {value_name} in ' '{object_store_name}object store{serialization_strategy_modifier}.' ).format( value_name=object_store_operation_result.value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) else: message = '' return DagsterEvent.from_step( DagsterEventType.OBJECT_STORE_OPERATION, step_context, event_specific_data=ObjectStoreOperationResultData( op=object_store_operation_result.op, metadata_entries=[ EventMetadataEntry.path(object_store_operation_result.key, label='key') ], ), message=message, ) def get_step_output_event(events, step_key, output_name='result'): check.list_param(events, 'events', of_type=DagsterEvent) check.str_param(step_key, 'step_key') check.str_param(output_name, 'output_name') for event in events: if ( event.event_type == DagsterEventType.STEP_OUTPUT and event.step_key == step_key and event.step_output_data.output_name == output_name ): return event return None class StepMaterializationData(namedtuple('_StepMaterializationData', 'materialization')): pass class StepExpectationResultData(namedtuple('_StepExpectationResultData', 'expectation_result')): pass class ObjectStoreOperationResultData( namedtuple('_ObjectStoreOperationResultData', 'op metadata_entries') ): pass class PipelineProcessStartedData( namedtuple('_PipelineProcessStartedData', 'process_id pipeline_name run_id') ): pass class PipelineProcessStartData(namedtuple('_PipelineProcessStartData', 'pipeline_name run_id')): pass class PipelineInitFailureData(namedtuple('_PipelineInitFailureData', 'error')): def __new__(cls, error): return super(PipelineInitFailureData, cls).__new__( cls, error=check.inst_param(error, 'error', SerializableErrorInfo) ) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/__init__.py b/python_modules/libraries/dagster-dbt/dagster_dbt/__init__.py index a094a8fd7..ac63522e4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/__init__.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/__init__.py @@ -1,101 +1,190 @@ from collections import namedtuple import io import os import re import shlex import subprocess from dagster import ( check, + ExpectationResult, EventMetadataEntry, Failure, + InputDefinition, Materialization, Nothing, Output, OutputDefinition, solid, ) CREATE_VIEW_REGEX = re.compile(r'OK created view model (\w+)\.(\w+)\.* \[CREATE VIEW') CREATE_TABLE_REGEX = re.compile(r'OK created table model (\w+)\.(\w+)\.* \[SELECT (\d+)') ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') +TEST_PASS_REGEX = re.compile(r'PASS (\w+)\.* \[PASS') +TEST_FAIL_REGEX = re.compile(r'FAIL (\d+) (\w+)\.* \[FAIL') def try_parse_create_view(text): view_match = CREATE_VIEW_REGEX.search(text) if not view_match: return None return Materialization( label='create_view', description=text, metadata_entries=[ EventMetadataEntry.text(view_match.group(1), 'schema'), EventMetadataEntry.text(view_match.group(2), 'view'), ], ) def try_parse_create_table(text): table_match = CREATE_TABLE_REGEX.search(text) if not table_match: return None return Materialization( label='create_table', description=text, metadata_entries=[ EventMetadataEntry.text(table_match.group(1), 'schema'), EventMetadataEntry.text(table_match.group(2), 'table'), EventMetadataEntry.text(table_match.group(3), 'row_count'), ], ) -def try_parse(text): +def try_parse_run(text): for parser in [try_parse_create_view, try_parse_create_table]: mat = parser(text) if mat: return mat -def create_dbt_solid(project_dir, name=None): +def create_dbt_solid(project_dir, name=None, profiles_dir=None): check.str_param(project_dir, 'project_dir') check.opt_str_param(name, 'name') + check.opt_str_param(profiles_dir, 'profiles_dir') @solid( name=name if name else os.path.basename(project_dir), output_defs=[OutputDefinition(dagster_type=Nothing, name='run_complete')], ) def dbt_solid(_): - args = shlex.split('dbt run --project-dir {}'.format(project_dir)) + cmd = 'dbt run --project-dir {}'.format(project_dir) + if profiles_dir: + cmd += ' --profiles-dir {}'.format(profiles_dir) + + args = shlex.split(cmd) proc = subprocess.Popen(args, stdout=subprocess.PIPE) # if https://github.com/fishtown-analytics/dbt/issues/1237 gets done # we should definitely switch to parsing the json output, as that # would be much more reliable/resilient for line in io.TextIOWrapper(proc.stdout, encoding='utf-8'): text = line.rstrip() if not text: continue # print to stdout print(text) # remove colors text = ANSI_ESCAPE.sub('', text) - mat = try_parse(text) + mat = try_parse_run(text) if mat: yield mat proc.wait() if proc.returncode != 0: raise Failure('Dbt invocation errored') yield Output(value=None, output_name='run_complete') return dbt_solid + + +def try_parse_pass(text): + pass_match = TEST_PASS_REGEX.search(text) + + if not pass_match: + return None + + test_name = pass_match.group(1) + + return ExpectationResult( + success=True, + label='dbt_test', + description='Dbt test {} passed'.format(test_name), + metadata_entries=[EventMetadataEntry.text(label='dbt_test_name', text=test_name)], + ) + + +def try_parse_fail(text): + fail_match = TEST_FAIL_REGEX.search(text) + + if not fail_match: + return None + + failure_count = fail_match.group(1) + test_name = fail_match.group(2) + + return ExpectationResult( + success=False, + label='dbt_test', + description='Dbt test {} failed'.format(test_name), + metadata_entries=[ + EventMetadataEntry.text(label='dbt_test_name', text=test_name), + EventMetadataEntry.text(label='failure_count', text=failure_count), + ], + ) + + +def try_parse_test(text): + for parser in [try_parse_pass, try_parse_fail]: + expect = parser(text) + if expect: + return expect + + +def create_dbt_test_solid(project_dir, name=None, profiles_dir=None): + check.str_param(project_dir, 'project_dir') + check.opt_str_param(name, 'name') + check.opt_str_param(profiles_dir, 'profiles_dir') + + @solid( + name=name if name else os.path.basename(project_dir) + '_test', + input_defs=[InputDefinition('test_start', Nothing)], + output_defs=[OutputDefinition(dagster_type=Nothing, name='test_complete')], + ) + def dbt_test_solid(_): + cmd = 'dbt test --project-dir {}'.format(project_dir) + if profiles_dir: + cmd += ' --profiles-dir {}'.format(profiles_dir) + args = shlex.split(cmd) + proc = subprocess.Popen(args, stdout=subprocess.PIPE) + for line in io.TextIOWrapper(proc.stdout, encoding='utf-8'): + text = line.rstrip() + if not text: + continue + + # print to stdout + print(text) + + # remove colors + text = ANSI_ESCAPE.sub('', text) + + expt = try_parse_test(text) + + if expt: + yield expt + + yield Output(value=None, output_name='test_complete') + + return dbt_test_solid diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_regexes.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_regexes.py index 598616e2e..9cc6cd3bf 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_regexes.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_regexes.py @@ -1,32 +1,72 @@ -from dagster_dbt import CREATE_TABLE_REGEX, CREATE_VIEW_REGEX +from dagster_dbt import CREATE_TABLE_REGEX, CREATE_VIEW_REGEX, TEST_PASS_REGEX, TEST_FAIL_REGEX TEST_CREATE_VIEW = ( '17:36:00 | 3 of 8 OK created view model ' 'dbt_alice.stg_customers................. [CREATE VIEW in 0.18s]' ) TEST_CREATE_TABLE = ( '17:36:01 | 4 of 8 OK created table model ' 'dbt_alice.order_payments............... [SELECT 99 in 0.07s]' ) def test_match_view_model(): m = CREATE_VIEW_REGEX.search(TEST_CREATE_VIEW) assert m schema = m.group(1) assert schema == 'dbt_alice' view = m.group(2) assert view == 'stg_customers' def test_match_table_model(): m = CREATE_TABLE_REGEX.search(TEST_CREATE_TABLE) assert m schema = m.group(1) assert schema == 'dbt_alice' table = m.group(2) assert table == 'order_payments' row_count = int(m.group(3)) assert row_count == 99 + + +TEST_PASS_LONG_STRING = ( + '13:55:22 | 1 of 20 PASS ' + 'accepted_values_fct_orders_status__placed__shipped__completed__return_pending__returned ' + '[PASS in 0.05s]' +) + + +TEST_PASS_SHORT_STRING = ( + '13:55:22 | 7 of 20 PASS not_null_fct_orders_coupon_amount' + '....................... [PASS in 0.04s]' +) + +LONG_NAME = ( + 'accepted_values_fct_orders_status__placed__shipped__completed__return_pending__returned' +) + + +def test_pass_long_string(): + m = TEST_PASS_REGEX.search(TEST_PASS_LONG_STRING) + assert m + assert m.group(1) == LONG_NAME + + +def test_pass_short_string(): + m = TEST_PASS_REGEX.search(TEST_PASS_SHORT_STRING) + assert m + assert m.group(1) == 'not_null_fct_orders_coupon_amount' + + +def test_fail(): + test_fail_text = ( + 'FAIL 5 accepted_values_fct_orders_status__jdkfjkd............ [FAIL 5 in 0.05s]' + ) + + m = TEST_FAIL_REGEX.search(test_fail_text) + assert m + assert m.group(1) == '5' + assert m.group(2) == 'accepted_values_fct_orders_status__jdkfjkd'