diff --git a/.buildkite/pipeline.py b/.buildkite/pipeline.py --- a/.buildkite/pipeline.py +++ b/.buildkite/pipeline.py @@ -368,6 +368,11 @@ "python_modules/libraries/dagster-aws", env_vars=["AWS_DEFAULT_REGION", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"], ), + ModuleBuildSpec( + "python_modules/libraries/dagster-aws-pyspark", + env_vars=["AWS_DEFAULT_REGION", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"], + supported_pythons=SupportedPython3s, + ), ModuleBuildSpec( "python_modules/libraries/dagster-azure", env_vars=["AZURE_STORAGE_ACCOUNT_KEY"], ), diff --git a/Makefile b/Makefile --- a/Makefile +++ b/Makefile @@ -42,6 +42,7 @@ -e python_modules/automation \ -e python_modules/libraries/dagster-pandas \ -e python_modules/libraries/dagster-aws \ + -e python_modules/libraries/dagster-aws-pyspark \ -e python_modules/libraries/dagster-celery \ -e python_modules/libraries/dagster-celery-docker \ -e python_modules/libraries/dagster-cron \ diff --git a/docs-requirements.txt b/docs-requirements.txt --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -6,6 +6,7 @@ -e ./python_modules/libraries/dagstermill -e ./python_modules/libraries/dagster-airflow -e ./python_modules/libraries/dagster-aws +-e ./python_modules/libraries/dagster-aws-pyspark -e ./python_modules/libraries/dagster-celery -e ./python_modules/libraries/dagster-celery-docker -e ./python_modules/libraries/dagster-cron diff --git a/docs/next/src/pages/overview/packages/libraries.mdx b/docs/next/src/pages/overview/packages/libraries.mdx --- a/docs/next/src/pages/overview/packages/libraries.mdx +++ b/docs/next/src/pages/overview/packages/libraries.mdx @@ -50,6 +50,10 @@ GCS are the preferred persistence solutions for long-running deploys. +**dagster-aws-pyspark** + +Enables having Dagster launch PySpark jobs on AWS EMR. + **dagster-celery** Pluggable executor to run Dagster pipelines using the [Celery task diff --git a/docs/sections/api/apidocs/libraries/dagster_aws.rst b/docs/sections/api/apidocs/libraries/dagster_aws.rst --- a/docs/sections/api/apidocs/libraries/dagster_aws.rst +++ b/docs/sections/api/apidocs/libraries/dagster_aws.rst @@ -61,9 +61,6 @@ EMR --- -.. autodata:: dagster_aws.emr.emr_pyspark_step_launcher - :annotation: ResourceDefinition - .. autoclass:: dagster_aws.emr.EmrJobRunner .. autoclass:: dagster_aws.emr.EmrError diff --git a/docs/sections/api/apidocs/libraries/dagster_aws_pyspark.rst b/docs/sections/api/apidocs/libraries/dagster_aws_pyspark.rst new file mode 100644 --- /dev/null +++ b/docs/sections/api/apidocs/libraries/dagster_aws_pyspark.rst @@ -0,0 +1,8 @@ +AWS PySpark (dagster_aws_pyspark) +================= + +.. currentmodule:: dagster_aws_pyspark + +.. autodata:: dagster_aws.emr.emr_pyspark_step_launcher + :annotation: ResourceDefinition + diff --git a/examples/airline_demo/airline_demo/pipelines.py b/examples/airline_demo/airline_demo/pipelines.py --- a/examples/airline_demo/airline_demo/pipelines.py +++ b/examples/airline_demo/airline_demo/pipelines.py @@ -1,6 +1,5 @@ """Pipeline definitions for the airline_demo. """ -from dagster_aws.emr import emr_pyspark_step_launcher from dagster_aws.s3 import ( S3FileHandle, file_handle_to_s3, @@ -9,6 +8,7 @@ s3_plus_default_intermediate_storage_defs, s3_resource, ) +from dagster_aws_pyspark import emr_pyspark_step_launcher from dagster_pyspark import pyspark_resource from dagster import ModeDefinition, PresetDefinition, composite_solid, local_file_manager, pipeline diff --git a/examples/airline_demo/setup.py b/examples/airline_demo/setup.py --- a/examples/airline_demo/setup.py +++ b/examples/airline_demo/setup.py @@ -23,6 +23,7 @@ "full": [ "dagstermill", "dagster-aws", + "dagster-aws-pyspark", "dagster-cron", "dagster-postgres", "dagster-pyspark", diff --git a/examples/airline_demo/tox.ini b/examples/airline_demo/tox.ini --- a/examples/airline_demo/tox.ini +++ b/examples/airline_demo/tox.ini @@ -13,6 +13,7 @@ -e ../../python_modules/libraries/dagster-postgres -e ../../python_modules/libraries/dagster-spark -e ../../python_modules/libraries/dagster-pyspark + -e ../../python_modules/libraries/dagster-aws-pyspark -e ../../python_modules/libraries/dagster-snowflake -e ../../python_modules/libraries/dagster-slack -e ../../python_modules/libraries/dagster-airflow diff --git a/examples/emr_pyspark/repo.py b/examples/emr_pyspark/repo.py --- a/examples/emr_pyspark/repo.py +++ b/examples/emr_pyspark/repo.py @@ -1,8 +1,8 @@ # start-snippet from pathlib import Path -from dagster_aws.emr import emr_pyspark_step_launcher from dagster_aws.s3 import s3_intermediate_storage, s3_resource +from dagster_aws_pyspark import emr_pyspark_step_launcher from dagster_pyspark import DataFrame as DagsterPySparkDataFrame from dagster_pyspark import pyspark_resource from pyspark.sql import DataFrame, Row diff --git a/examples/emr_pyspark/requirements.txt b/examples/emr_pyspark/requirements.txt --- a/examples/emr_pyspark/requirements.txt +++ b/examples/emr_pyspark/requirements.txt @@ -1,4 +1,4 @@ dagster dagit -dagster-aws +dagster-aws-pyspark dagster-pyspark diff --git a/examples/emr_pyspark/tox.ini b/examples/emr_pyspark/tox.ini --- a/examples/emr_pyspark/tox.ini +++ b/examples/emr_pyspark/tox.ini @@ -11,6 +11,7 @@ -e ../../python_modules/libraries/dagster-aws -e ../../python_modules/libraries/dagster-spark -e ../../python_modules/libraries/dagster-pyspark + -e ../../python_modules/libraries/dagster-aws-pyspark whitelist_externals = /bin/bash echo diff --git a/examples/legacy_examples/Dockerfile b/examples/legacy_examples/Dockerfile --- a/examples/legacy_examples/Dockerfile +++ b/examples/legacy_examples/Dockerfile @@ -20,6 +20,7 @@ -e dagster-pandas \ -e dagstermill \ -e dagster-aws \ + -e dagster-aws-pyspark \ -e dagster-spark \ -e dagster-pyspark \ -e dagster-postgres \ @@ -36,6 +37,7 @@ -e dagster-pandas \ -e dagstermill \ -e dagster-aws \ + -e dagster-aws-pyspark \ -e dagster-spark \ -e dagster-pyspark \ -e dagster-postgres \ diff --git a/examples/legacy_examples/dagster_examples/simple_pyspark/pipelines.py b/examples/legacy_examples/dagster_examples/simple_pyspark/pipelines.py --- a/examples/legacy_examples/dagster_examples/simple_pyspark/pipelines.py +++ b/examples/legacy_examples/dagster_examples/simple_pyspark/pipelines.py @@ -1,6 +1,6 @@ """Pipeline definitions for the simple_pyspark example.""" -from dagster_aws.emr import emr_pyspark_step_launcher from dagster_aws.s3 import s3_plus_default_intermediate_storage_defs, s3_resource +from dagster_aws_pyspark import emr_pyspark_step_launcher from dagster_databricks import databricks_pyspark_step_launcher from dagster_pyspark import pyspark_resource diff --git a/examples/legacy_examples/setup.py b/examples/legacy_examples/setup.py --- a/examples/legacy_examples/setup.py +++ b/examples/legacy_examples/setup.py @@ -24,6 +24,7 @@ "full": [ "dagstermill", "dagster-aws", + "dagster-aws-pyspark", "dagster-cron", "dagster-postgres", "dagster-pyspark", diff --git a/examples/legacy_examples/tox.ini b/examples/legacy_examples/tox.ini --- a/examples/legacy_examples/tox.ini +++ b/examples/legacy_examples/tox.ini @@ -19,6 +19,7 @@ -e ../../python_modules/libraries/dagster-postgres -e ../../python_modules/libraries/dagster-pyspark -e ../../python_modules/libraries/dagster-snowflake + -e ../../python_modules/libraries/dagster-aws-pyspark -e ../../python_modules/libraries/dagster-gcp -e ../../python_modules/libraries/dagster-databricks -e ../../python_modules/libraries/lakehouse diff --git a/python_modules/libraries/dagster-aws-pyspark/.coveragerc b/python_modules/libraries/dagster-aws-pyspark/.coveragerc new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/.coveragerc @@ -0,0 +1,2 @@ +[run] +branch = True diff --git a/python_modules/libraries/dagster-aws-pyspark/README.md b/python_modules/libraries/dagster-aws-pyspark/README.md new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/README.md @@ -0,0 +1,4 @@ +# dagster-aws-pyspark + +The docs for `dagster-aws-pyspark` can be found +[here](https://docs.dagster.io/_apidocs/libraries/dagster_aws_pyspark). diff --git a/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/__init__.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/__init__.py new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/__init__.py @@ -0,0 +1,9 @@ +from dagster.core.utils import check_dagster_package_version + +from .pyspark_step_launcher import emr_pyspark_step_launcher +from .version import __version__ + +check_dagster_package_version("dagster-aws-pyspark", __version__) + + +__all__ = ["emr_pyspark_step_launcher"] diff --git a/python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/pyspark_step_launcher.py rename from python_modules/libraries/dagster-aws/dagster_aws/emr/pyspark_step_launcher.py rename to python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/pyspark_step_launcher.py diff --git a/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/version.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/version.py new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark/version.py @@ -0,0 +1 @@ +__version__ = "0.9.14" diff --git a/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/__init__.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/__init__.py new file mode 100644 diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_pyspark_step_launcher.py rename from python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py rename to python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_pyspark_step_launcher.py --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py +++ b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_pyspark_step_launcher.py @@ -4,8 +4,11 @@ import pytest from dagster_aws.emr import EmrError, EmrJobRunner -from dagster_aws.emr.pyspark_step_launcher import EmrPySparkStepLauncher, emr_pyspark_step_launcher from dagster_aws.s3 import s3_plus_default_storage_defs, s3_resource +from dagster_aws_pyspark.pyspark_step_launcher import ( + EmrPySparkStepLauncher, + emr_pyspark_step_launcher, +) from dagster_pyspark import DataFrame, pyspark_resource from moto import mock_emr from pyspark.sql import Row @@ -251,3 +254,31 @@ pass assert mock_log_logs.call_count == 1 + + +EVENTS = [object(), object(), object()] + + +@mock.patch( + "dagster_aws.emr.emr.EmrJobRunner.is_emr_step_complete", side_effect=[False, False, True] +) +@mock.patch( + "dagster_aws.emr.pyspark_step_launcher.EmrPySparkStepLauncher.read_events", + side_effect=[EVENTS[0:1], [], EVENTS[0:3]], +) +def test_wait_for_completion(_mock_is_emr_step_complete, _mock_read_events): + launcher = EmrPySparkStepLauncher( + region_name="", + staging_bucket="", + staging_prefix="", + wait_for_logs=False, + action_on_failure="", + cluster_id="", + spark_config={}, + local_pipeline_package_path="", + deploy_local_pipeline_package=False, + ) + yielded_events = list( + launcher.wait_for_completion(mock.MagicMock(), None, None, None, None, check_interval=0) + ) + assert yielded_events == EVENTS diff --git a/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_version.py b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_version.py new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/dagster_aws_pyspark_tests/test_version.py @@ -0,0 +1,5 @@ +from dagster_aws_pyspark.version import __version__ + + +def test_version(): + assert __version__ diff --git a/python_modules/libraries/dagster-aws-pyspark/dev-requirements.txt b/python_modules/libraries/dagster-aws-pyspark/dev-requirements.txt new file mode 100644 diff --git a/python_modules/libraries/dagster-aws-pyspark/requirements.txt b/python_modules/libraries/dagster-aws-pyspark/requirements.txt new file mode 100644 diff --git a/python_modules/libraries/dagster-aws/setup.py b/python_modules/libraries/dagster-aws-pyspark/setup.py copy from python_modules/libraries/dagster-aws/setup.py copy to python_modules/libraries/dagster-aws-pyspark/setup.py --- a/python_modules/libraries/dagster-aws/setup.py +++ b/python_modules/libraries/dagster-aws-pyspark/setup.py @@ -3,7 +3,7 @@ def get_version(): version = {} - with open("dagster_aws/version.py") as fp: + with open("dagster_aws_pyspark/version.py") as fp: exec(fp.read(), version) # pylint: disable=W0122 return version["__version__"] @@ -11,23 +11,21 @@ if __name__ == "__main__": setup( - name="dagster-aws", + name="dagster-aws-pyspark", version=get_version(), author="Elementl", author_email="hello@elementl.com", license="Apache-2.0", - description="Package for AWS-specific Dagster framework solid and resource components.", - url="https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-aws", + description="A Dagster integration for PySpark on AWS EMR", + url="https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-aws-pyspark", classifiers=[ - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", ], packages=find_packages(exclude=["test"]), - include_package_data=True, - install_requires=["boto3>=1.9", "dagster", "packaging", "psycopg2-binary", "requests"], - extras_require={"pyspark": ["dagster-pyspark"]}, + install_requires=["dagster", "dagster-aws", "dagster-pyspark"], + tests_require=[], zip_safe=False, ) diff --git a/python_modules/libraries/dagster-aws-pyspark/tox.ini b/python_modules/libraries/dagster-aws-pyspark/tox.ini new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-aws-pyspark/tox.ini @@ -0,0 +1,32 @@ +[tox] +envlist = py{38,37,36}-{unix,windows},pylint + +[testenv] +passenv = CI_* COVERALLS_REPO_TOKEN AWS_* BUILDKITE SSH_* +deps = + -e ../../dagster + -r ../../dagster/dev-requirements.txt + -e ../dagster-aws + -e ../dagster-spark + -e ../dagster-pyspark + -e . +usedevelop = true +whitelist_externals = + /bin/bash + echo +commands = + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit' + coverage erase + echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m" + pytest -vv --junitxml=test_results.xml --cov=dagster_aws_pyspark --cov-append --cov-report= + coverage report --omit='.tox/*,**/test_*.py' --skip-covered + coverage html --omit='.tox/*,**/test_*.py' + coverage xml --omit='.tox/*,**/test_*.py' + +[testenv:pylint] +whitelist_externals = + pylint +basepython = + pylint: python3.7 +commands = + pylint: pylint -j 0 --rcfile=../../../.pylintrc dagster_aws_pyspark dagster_aws_pyspark_tests diff --git a/python_modules/libraries/dagster-aws/dagster_aws/emr/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/emr/__init__.py --- a/python_modules/libraries/dagster-aws/dagster_aws/emr/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/emr/__init__.py @@ -1,5 +1,4 @@ from .emr import EmrError, EmrJobRunner -from .pyspark_step_launcher import emr_pyspark_step_launcher from .types import ( EMR_CLUSTER_DONE_STATES, EMR_CLUSTER_TERMINATED_STATES, diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark_step_launcher.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark_step_launcher.py deleted file mode 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark_step_launcher.py +++ /dev/null @@ -1,30 +0,0 @@ -from dagster_aws.emr.pyspark_step_launcher import EmrPySparkStepLauncher - -from dagster.seven import mock - -EVENTS = [object(), object(), object()] - - -@mock.patch( - "dagster_aws.emr.emr.EmrJobRunner.is_emr_step_complete", side_effect=[False, False, True] -) -@mock.patch( - "dagster_aws.emr.pyspark_step_launcher.EmrPySparkStepLauncher.read_events", - side_effect=[EVENTS[0:1], [], EVENTS[0:3]], -) -def test_wait_for_completion(_mock_is_emr_step_complete, _mock_read_events): - launcher = EmrPySparkStepLauncher( - region_name="", - staging_bucket="", - staging_prefix="", - wait_for_logs=False, - action_on_failure="", - cluster_id="", - spark_config={}, - local_pipeline_package_path="", - deploy_local_pipeline_package=False, - ) - yielded_events = list( - launcher.wait_for_completion(mock.MagicMock(), None, None, None, None, check_interval=0) - ) - assert yielded_events == EVENTS diff --git a/python_modules/libraries/dagster-aws/setup.py b/python_modules/libraries/dagster-aws/setup.py --- a/python_modules/libraries/dagster-aws/setup.py +++ b/python_modules/libraries/dagster-aws/setup.py @@ -28,6 +28,5 @@ packages=find_packages(exclude=["test"]), include_package_data=True, install_requires=["boto3>=1.9", "dagster", "packaging", "psycopg2-binary", "requests"], - extras_require={"pyspark": ["dagster-pyspark"]}, zip_safe=False, )