diff --git a/python_modules/libraries/dagster-pandas/dagster_pandas/data_frame.py b/python_modules/libraries/dagster-pandas/dagster_pandas/data_frame.py index 4c4b0370d..ecfac9ef2 100644 --- a/python_modules/libraries/dagster-pandas/dagster_pandas/data_frame.py +++ b/python_modules/libraries/dagster-pandas/dagster_pandas/data_frame.py @@ -1,204 +1,207 @@ import pandas as pd from dagster_pandas.constraints import ( ColumnExistsConstraint, ColumnTypeConstraint, ConstraintViolationException, ) from dagster_pandas.validation import PandasColumn, validate_constraints from dagster import ( DagsterInvariantViolationError, DagsterType, EventMetadataEntry, Field, Materialization, Path, String, TypeCheck, check, ) from dagster.config.field_utils import Selector from dagster.core.types.config_schema import input_selector_schema, output_selector_schema from dagster.core.types.decorator import register_python_type CONSTRAINT_BLACKLIST = {ColumnExistsConstraint, ColumnTypeConstraint} def dict_without_keys(ddict, *keys): return {key: value for key, value in ddict.items() if key not in set(keys)} @output_selector_schema( Selector( { 'csv': {'path': Path, 'sep': Field(String, is_required=False, default_value=','),}, 'parquet': {'path': Path}, 'table': {'path': Path}, }, ) ) def dataframe_output_schema(_context, file_type, file_options, pandas_df): check.str_param(file_type, 'file_type') check.dict_param(file_options, 'file_options') check.inst_param(pandas_df, 'pandas_df', pd.DataFrame) if file_type == 'csv': path = file_options['path'] pandas_df.to_csv(path, index=False, **dict_without_keys(file_options, 'path')) elif file_type == 'parquet': pandas_df.to_parquet(file_options['path']) elif file_type == 'table': pandas_df.to_csv(file_options['path'], sep='\t', index=False) else: check.failed('Unsupported file_type {file_type}'.format(file_type=file_type)) return Materialization.file(file_options['path']) @input_selector_schema( Selector( { 'csv': {'path': Path, 'sep': Field(String, is_required=False, default_value=','),}, 'parquet': {'path': Path}, 'table': {'path': Path}, }, ) ) def dataframe_input_schema(_context, file_type, file_options): check.str_param(file_type, 'file_type') check.dict_param(file_options, 'file_options') if file_type == 'csv': path = file_options['path'] return pd.read_csv(path, **dict_without_keys(file_options, 'path')) elif file_type == 'parquet': return pd.read_parquet(file_options['path']) elif file_type == 'table': return pd.read_csv(file_options['path'], sep='\t') else: raise DagsterInvariantViolationError( 'Unsupported file_type {file_type}'.format(file_type=file_type) ) def df_type_check(value): if not isinstance(value, pd.DataFrame): return TypeCheck(success=False) return TypeCheck( success=True, metadata_entries=[ EventMetadataEntry.text(str(len(value)), 'row_count', 'Number of rows in DataFrame'), # string cast columns since they may be things like datetime EventMetadataEntry.json({'columns': list(map(str, value.columns))}, 'metadata'), ], ) DataFrame = DagsterType( name='PandasDataFrame', description='''Two-dimensional size-mutable, potentially heterogeneous tabular data structure with labeled axes (rows and columns). See http://pandas.pydata.org/''', input_hydration_config=dataframe_input_schema, output_materialization_config=dataframe_output_schema, type_check_fn=df_type_check, ) register_python_type(pd.DataFrame, DataFrame) def _construct_constraint_list(constraints): def add_bullet(constraint_list, constraint_description): return constraint_list + "+ {constraint_description}\n".format( constraint_description=constraint_description ) constraint_list = "" for constraint in constraints: if constraint.__class__ not in CONSTRAINT_BLACKLIST: constraint_list = add_bullet(constraint_list, constraint.markdown_description) return constraint_list def _build_column_header(column_name, constraints): expected_column_types = None column_type_constraint = [ constraint for constraint in constraints if isinstance(constraint, ColumnTypeConstraint) ] if column_type_constraint: expected_types = tuple(column_type_constraint[0].expected_pandas_dtypes) if expected_types: expected_column_types = ( expected_types[0] if len(expected_types) == 1 else tuple(expected_types) ) column_header = '**{column_name}**'.format(column_name=column_name) if expected_column_types: column_header += ": `{expected_dtypes}`".format(expected_dtypes=expected_column_types) return column_header def create_dagster_pandas_dataframe_description(description, columns): title = "\n".join([description, '### Columns', '']) buildme = title for column in columns: buildme += "{}\n{}\n".format( _build_column_header(column.name, column.constraints), _construct_constraint_list(column.constraints), ) return buildme def create_dagster_pandas_dataframe_type( name=None, description=None, columns=None, event_metadata_fn=None, dataframe_constraints=None ): event_metadata_fn = check.opt_callable_param(event_metadata_fn, 'event_metadata_fn') description = create_dagster_pandas_dataframe_description( check.opt_str_param(description, 'description', default=''), check.opt_list_param(columns, 'columns', of_type=PandasColumn), ) def _dagster_type_check(value): if not isinstance(value, pd.DataFrame): return TypeCheck( success=False, description='Must be a pandas.DataFrame. Got value of type. {type_name}'.format( type_name=type(value).__name__ ), ) try: validate_constraints( value, pandas_columns=columns, dataframe_constraints=dataframe_constraints ) except ConstraintViolationException as e: return TypeCheck(success=False, description=str(e)) return TypeCheck( success=True, metadata_entries=_execute_summary_stats(name, value, event_metadata_fn) if event_metadata_fn else None, ) # add input_hydration_confign and output_materialization_config # https://github.com/dagster-io/dagster/issues/2027 return DagsterType(name=name, type_check_fn=_dagster_type_check, description=description) def _execute_summary_stats(type_name, value, event_metadata_fn): + if not event_metadata_fn: + return [] + metadata_entries = event_metadata_fn(value) if not ( isinstance(metadata_entries, list) and all(isinstance(item, EventMetadataEntry) for item in metadata_entries) ): raise DagsterInvariantViolationError( ( 'The return value of the user-defined summary_statistics function ' 'for pandas data frame type {type_name} returned {value}. ' 'This function must return List[EventMetadataEntry]' ).format(type_name=type_name, value=repr(metadata_entries)) ) return metadata_entries diff --git a/python_modules/libraries/dagster-pandas/dagster_pandas/validation.py b/python_modules/libraries/dagster-pandas/dagster_pandas/validation.py index 5b61cec49..0b8f95169 100644 --- a/python_modules/libraries/dagster-pandas/dagster_pandas/validation.py +++ b/python_modules/libraries/dagster-pandas/dagster_pandas/validation.py @@ -1,152 +1,159 @@ from dagster_pandas.constraints import ( CategoricalColumnConstraint, ColumnExistsConstraint, ColumnTypeConstraint, Constraint, DataFrameConstraint, InRangeColumnConstraint, NonNullableColumnConstraint, UniqueColumnConstraint, ) from pandas import DataFrame, Timestamp from dagster import check _BASE_CONSTRAINTS = [ ColumnExistsConstraint(), ] _CONFIGURABLE_CONSTRAINTS = { 'exists': NonNullableColumnConstraint, 'unique': UniqueColumnConstraint, } PANDAS_NUMERIC_TYPES = {'int64', 'float'} class PandasColumn: def __init__(self, name, constraints=None): self.name = check.str_param(name, 'name') self.constraints = _BASE_CONSTRAINTS + check.opt_list_param( constraints, 'constraints', of_type=Constraint ) def validate(self, dataframe): for constraint in self.constraints: constraint.validate(dataframe, self.name) @staticmethod def add_configurable_constraints(constraints, **kwargs): for configurable_constraint_flag, constraint in _CONFIGURABLE_CONSTRAINTS.items(): apply_constraint = kwargs.get(configurable_constraint_flag, False) if apply_constraint: constraints.append(constraint()) return constraints + @classmethod + def exists(cls, name, exists=False, unique=False): + return cls( + name=check.str_param(name, 'name'), + constraints=cls.add_configurable_constraints([], exists=exists, unique=unique), + ) + @classmethod def boolean_column(cls, name, exists=False, unique=False): return cls( name=check.str_param(name, 'name'), constraints=cls.add_configurable_constraints( [ColumnTypeConstraint('bool')], exists=exists, unique=unique, ), ) @classmethod def numeric_column( cls, name, expected_dtypes, min_value=-float('inf'), max_value=float('inf'), exists=False, unique=False, ): return cls( name=check.str_param(name, 'name'), constraints=cls.add_configurable_constraints( [ ColumnTypeConstraint(expected_dtypes), InRangeColumnConstraint( check.numeric_param(min_value, 'min_value'), check.numeric_param(max_value, 'max_value'), ), ], exists=exists, unique=unique, ), ) @classmethod def integer_column( cls, name, min_value=-float('inf'), max_value=float('inf'), exists=False, unique=False ): return cls.numeric_column(name, 'int64', min_value, max_value, exists=exists, unique=unique) @classmethod def float_column( cls, name, min_value=-float('inf'), max_value=float('inf'), exists=False, unique=False ): return cls.numeric_column( name, 'float64', min_value, max_value, exists=exists, unique=unique ) @classmethod def datetime_column( cls, name, min_datetime=Timestamp.min, max_datetime=Timestamp.max, exists=False, unique=False, ): return cls( name=check.str_param(name, 'name'), constraints=cls.add_configurable_constraints( [ ColumnTypeConstraint({'datetime64[ns]'}), InRangeColumnConstraint(min_datetime, max_datetime), ], exists=exists, unique=unique, ), ) @classmethod def string_column(cls, name, exists=False, unique=False): return cls( name=check.str_param(name, 'name'), constraints=cls.add_configurable_constraints( [ColumnTypeConstraint('object')], exists=exists, unique=unique ), ) @classmethod def categorical_column(cls, name, categories, of_types='object', exists=False, unique=False): return cls( name=check.str_param(name, 'name'), constraints=cls.add_configurable_constraints( [ColumnTypeConstraint(of_types), CategoricalColumnConstraint(categories)], exists=exists, unique=unique, ), ) def validate_constraints(dataframe, pandas_columns=None, dataframe_constraints=None): dataframe = check.inst_param(dataframe, 'dataframe', DataFrame) pandas_columns = check.opt_list_param( pandas_columns, 'column_constraints', of_type=PandasColumn ) dataframe_constraints = check.opt_list_param( dataframe_constraints, 'dataframe_constraints', of_type=DataFrameConstraint ) if pandas_columns: for column in pandas_columns: column.validate(dataframe) if dataframe_constraints: for dataframe_constraint in dataframe_constraints: dataframe_constraint.validate(dataframe) diff --git a/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_data_frame.py b/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_data_frame.py index 0d5c93ff0..38e733725 100644 --- a/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_data_frame.py +++ b/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_data_frame.py @@ -1,115 +1,155 @@ import pytest from dagster_pandas.constraints import ( ColumnTypeConstraint, InRangeColumnConstraint, NonNullableColumnConstraint, ) -from dagster_pandas.data_frame import create_dagster_pandas_dataframe_type +from dagster_pandas.data_frame import _execute_summary_stats, create_dagster_pandas_dataframe_type from dagster_pandas.validation import PandasColumn from pandas import DataFrame from dagster import ( DagsterInvariantViolationError, DagsterType, EventMetadataEntry, Output, OutputDefinition, check_dagster_type, execute_pipeline, pipeline, solid, ) def test_create_pandas_dataframe_dagster_type(): TestDataFrame = create_dagster_pandas_dataframe_type( name='TestDataFrame', columns=[PandasColumn(name='foo', constraints=[ColumnTypeConstraint('int64')])], ) assert isinstance(TestDataFrame, DagsterType) def test_basic_pipeline_with_pandas_dataframe_dagster_type(): def compute_event_metadata(dataframe): return [ EventMetadataEntry.text(str(max(dataframe['pid'])), 'max_pid', 'maximum pid'), ] BasicDF = create_dagster_pandas_dataframe_type( name='BasicDF', columns=[ PandasColumn.integer_column('pid', exists=True), PandasColumn.string_column('names'), ], event_metadata_fn=compute_event_metadata, ) @solid(output_defs=[OutputDefinition(name='basic_dataframe', dagster_type=BasicDF)]) def create_dataframe(_): yield Output( DataFrame({'pid': [1, 2, 3], 'names': ['foo', 'bar', 'baz']}), output_name='basic_dataframe', ) @pipeline def basic_pipeline(): return create_dataframe() result = execute_pipeline(basic_pipeline) assert result.success for event in result.event_list: if event.event_type_value == 'STEP_OUTPUT': mock_df_output_event_metadata = ( event.event_specific_data.type_check_data.metadata_entries ) assert len(mock_df_output_event_metadata) == 1 assert any([entry.label == 'max_pid' for entry in mock_df_output_event_metadata]) +def test_create_dagster_pandas_dataframe_type_with_null_event_metadata_fn(): + BasicDF = create_dagster_pandas_dataframe_type( + name='BasicDF', + columns=[ + PandasColumn.integer_column('pid', exists=True), + PandasColumn.string_column('names'), + ], + event_metadata_fn=None, + ) + assert isinstance(BasicDF, DagsterType) + basic_type_check = BasicDF.type_check(DataFrame({'pid': [1], 'names': ['foo']})) + assert basic_type_check.success + + def test_bad_dataframe_type_returns_bad_stuff(): with pytest.raises(DagsterInvariantViolationError): BadDFBadSummaryStats = create_dagster_pandas_dataframe_type( 'BadDF', event_metadata_fn=lambda _: 'ksjdkfsd' ) check_dagster_type(BadDFBadSummaryStats, DataFrame({'num': [1]})) with pytest.raises(DagsterInvariantViolationError): BadDFBadSummaryStatsListItem = create_dagster_pandas_dataframe_type( 'BadDF', event_metadata_fn=lambda _: ['ksjdkfsd'] ) check_dagster_type(BadDFBadSummaryStatsListItem, DataFrame({'num': [1]})) def test_dataframe_description_generation_just_type_constraint(): TestDataFrame = create_dagster_pandas_dataframe_type( name='TestDataFrame', columns=[PandasColumn(name='foo', constraints=[ColumnTypeConstraint('int64')])], ) assert TestDataFrame.description == "\n### Columns\n**foo**: `int64`\n\n" def test_dataframe_description_generation_no_type_constraint(): TestDataFrame = create_dagster_pandas_dataframe_type( name='TestDataFrame', columns=[PandasColumn(name='foo')], ) assert TestDataFrame.description == "\n### Columns\n**foo**\n\n" def test_dataframe_description_generation_multi_constraints(): TestDataFrame = create_dagster_pandas_dataframe_type( name='TestDataFrame', columns=[ PandasColumn( name='foo', constraints=[ ColumnTypeConstraint('int64'), InRangeColumnConstraint(0, 100), NonNullableColumnConstraint(), ], ), ], ) assert ( TestDataFrame.description == "\n### Columns\n**foo**: `int64`\n+ 0 < values < 100\n+ No Null values allowed.\n\n" ) + + +def test_execute_summary_stats_null_function(): + assert _execute_summary_stats('foo', DataFrame(), None) == [] + + metadata_entries = _execute_summary_stats( + 'foo', + DataFrame({'bar': [1, 2, 3]}), + lambda value: [EventMetadataEntry.text('baz', 'qux', 'quux')], + ) + assert len(metadata_entries) == 1 + assert metadata_entries[0].label == 'qux' + assert metadata_entries[0].description == 'quux' + assert metadata_entries[0].entry_data.text == 'baz' + + +def test_execute_summary_stats_error(): + with pytest.raises(DagsterInvariantViolationError): + assert _execute_summary_stats('foo', DataFrame({}), lambda value: 'jajaja') + + with pytest.raises(DagsterInvariantViolationError): + assert _execute_summary_stats( + 'foo', + DataFrame({}), + lambda value: [EventMetadataEntry.text('baz', 'qux', 'quux'), 'rofl'], + ) diff --git a/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_validation.py b/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_validation.py index 32cffda21..8052980b5 100644 --- a/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_validation.py +++ b/python_modules/libraries/dagster-pandas/dagster_pandas_tests/test_validation.py @@ -1,122 +1,130 @@ import pytest from dagster_pandas.constraints import ( CategoricalColumnConstraint, + ColumnExistsConstraint, ColumnTypeConstraint, ConstraintViolationException, InRangeColumnConstraint, NonNullableColumnConstraint, RowCountConstraint, UniqueColumnConstraint, ) from dagster_pandas.validation import PandasColumn, validate_constraints from pandas import DataFrame -def test_validate_collection_schema_ok(): +def test_validate_constraints_ok(): column_constraints = [ PandasColumn(name='foo', constraints=[ColumnTypeConstraint('object')]), ] dataframe = DataFrame({'foo': ['bar', 'baz']}) assert validate_constraints(dataframe, pandas_columns=column_constraints) is None @pytest.mark.parametrize( 'column_constraints, dataframe', [ ( [PandasColumn(name='foo', constraints=[ColumnTypeConstraint('int64')])], DataFrame({'foo': ['bar', 'baz']}), ), ( [PandasColumn(name='foo', constraints=[ColumnTypeConstraint('object')])], DataFrame({'bar': ['bar', 'baz']}), ), ], ) -def test_validate_collection_schema_throw_error(column_constraints, dataframe): +def test_validate_constraints_throw_error(column_constraints, dataframe): with pytest.raises(ConstraintViolationException): validate_constraints(dataframe, pandas_columns=column_constraints) def test_shape_validation_ok(): assert ( validate_constraints( DataFrame({'foo': [2], 'bar': ['hello']}), pandas_columns=[ PandasColumn.integer_column('foo', min_value=0), PandasColumn.string_column('bar'), ], dataframe_constraints=[RowCountConstraint(1)], ) is None ) def test_shape_validation_without_column_constraints(): assert ( validate_constraints( DataFrame({'foo': [2], 'bar': ['hello']}), dataframe_constraints=[RowCountConstraint(1)] ) is None ) with pytest.raises(ConstraintViolationException): validate_constraints( DataFrame({'foo': [2], 'bar': ['hello']}), dataframe_constraints=[RowCountConstraint(2)] ) def test_shape_validation_throw_error(): with pytest.raises(ConstraintViolationException): validate_constraints( DataFrame({'foo': [2], 'bar': ['hello']}), pandas_columns=[ PandasColumn.integer_column('foo', min_value=0), PandasColumn.string_column('bar'), ], dataframe_constraints=[RowCountConstraint(2)], ) def has_constraints(column, constraints): for constraint in constraints: if not any( [isinstance(col_constraint, constraint) for col_constraint in column.constraints] ): return False return True +def test_exists_column_composition(): + exists_column = PandasColumn.exists('foo') + assert isinstance(exists_column, PandasColumn) + assert len(exists_column.constraints) == 1 + assert isinstance(exists_column.constraints[0], ColumnExistsConstraint) + + @pytest.mark.parametrize( 'composer, composer_args, expected_constraints', [ (PandasColumn.boolean_column, [], [ColumnTypeConstraint]), ( PandasColumn.numeric_column, [{'int64', 'float64'}], [ColumnTypeConstraint, InRangeColumnConstraint], ), (PandasColumn.datetime_column, [], [ColumnTypeConstraint, InRangeColumnConstraint]), (PandasColumn.string_column, [], [ColumnTypeConstraint]), ( PandasColumn.categorical_column, [{'a', 'b'}], [ColumnTypeConstraint, CategoricalColumnConstraint], ), ], ) def test_datetime_column_composition(composer, composer_args, expected_constraints): column = composer('foo', *composer_args) assert isinstance(column, PandasColumn) assert column.name == 'foo' assert has_constraints(column, expected_constraints) # Test non nullable constraint flag exists_included_constraints = expected_constraints + [NonNullableColumnConstraint] non_nullable_column = composer('foo', *composer_args, exists=True) assert has_constraints(non_nullable_column, exists_included_constraints) # Test unique constraint flag distinct_included_constraints = expected_constraints + [UniqueColumnConstraint] distinct_column = composer('foo', *composer_args, unique=True) assert has_constraints(distinct_column, distinct_included_constraints)