diff --git a/python_modules/libraries/dagster-spark/dagster_spark/__init__.py b/python_modules/libraries/dagster-spark/dagster_spark/__init__.py --- a/python_modules/libraries/dagster-spark/dagster_spark/__init__.py +++ b/python_modules/libraries/dagster-spark/dagster_spark/__init__.py @@ -1,4 +1,5 @@ -from .solids import SparkSolidDefinition +from .resources import spark_resource +from .solids import create_spark_solid from .types import SparkSolidError -__all__ = ['SparkSolidDefinition', 'SparkSolidError'] +__all__ = ['create_spark_solid', 'spark_resource', 'SparkSolidError'] diff --git a/python_modules/libraries/dagster-spark/dagster_spark/configs.py b/python_modules/libraries/dagster-spark/dagster_spark/configs.py --- a/python_modules/libraries/dagster-spark/dagster_spark/configs.py +++ b/python_modules/libraries/dagster-spark/dagster_spark/configs.py @@ -5,7 +5,7 @@ https://spark.apache.org/docs/latest/submitting-applications.html for a more in-depth summary of Spark deployment contexts and configuration. ''' -from dagster import Dict, Field, List, Path, String +from dagster import Field, List, Path, String from .configs_spark import spark_config from .types import SparkDeployMode @@ -60,16 +60,12 @@ spark_outputs = Field(List[String], description='The outputs that this Spark job will produce') - return Field( - Dict( - fields={ - 'master_url': master_url, - 'deploy_mode': deploy_mode, - 'application_jar': application_jar, - 'spark_conf': spark_config(), - 'spark_home': spark_home, - 'application_arguments': application_arguments, - 'spark_outputs': spark_outputs, - } - ) - ) + return { + 'master_url': master_url, + 'deploy_mode': deploy_mode, + 'application_jar': application_jar, + 'spark_conf': spark_config(), + 'spark_home': spark_home, + 'application_arguments': application_arguments, + 'spark_outputs': spark_outputs, + } diff --git a/python_modules/libraries/dagster-spark/dagster_spark/resources.py b/python_modules/libraries/dagster-spark/dagster_spark/resources.py new file mode 100644 --- /dev/null +++ b/python_modules/libraries/dagster-spark/dagster_spark/resources.py @@ -0,0 +1,79 @@ +import os + +from dagster import resource + +from .configs import define_spark_config +from .types import SparkSolidError +from .utils import parse_spark_config + + +class SparkResource: + def __init__(self, config): + self.config = config + + def shell_cmd(self, main_class): + # Extract parameters from config + ( + master_url, + deploy_mode, + application_jar, + spark_conf, + application_arguments, + spark_home, + ) = [ + self.config.get(k) + for k in ( + 'master_url', + 'deploy_mode', + 'application_jar', + 'spark_conf', + 'application_arguments', + 'spark_home', + ) + ] + + # Let the user use env vars in the jar path + application_jar = os.path.expandvars(application_jar) + + if not os.path.exists(application_jar): + raise SparkSolidError( + ( + 'Application jar {} does not exist. A valid jar must be ' + 'built before running this solid.'.format(application_jar) + ) + ) + + spark_home = spark_home if spark_home else os.environ.get('SPARK_HOME') + + if spark_home is None: + raise SparkSolidError( + ( + 'No spark home set. You must either pass spark_home in config or ' + 'set $SPARK_HOME in your environment (got None).' + ) + ) + + deploy_mode = ['--deploy-mode', '{}'.format(deploy_mode)] if deploy_mode else [] + + spark_shell_cmd = ( + [ + '{}/bin/spark-submit'.format(spark_home), + '--class', + main_class, + '--master', + master_url, + ] + + deploy_mode + + parse_spark_config(spark_conf) + + [application_jar] + + ([application_arguments] if application_arguments else []) + ) + return spark_shell_cmd + + +@resource( + config=define_spark_config(), + description='''This resource is for posting events to PagerDuty.''', +) +def spark_resource(context): + return SparkResource(context.resource_config) diff --git a/python_modules/libraries/dagster-spark/dagster_spark/solids.py b/python_modules/libraries/dagster-spark/dagster_spark/solids.py --- a/python_modules/libraries/dagster-spark/dagster_spark/solids.py +++ b/python_modules/libraries/dagster-spark/dagster_spark/solids.py @@ -1,70 +1,11 @@ -import functools -import os import subprocess -from dagster import InputDefinition, List, Output, OutputDefinition, Path, SolidDefinition, check +from dagster import InputDefinition, List, Output, OutputDefinition, Path, check, solid -from .configs import define_spark_config from .types import SparkSolidError -from .utils import parse_spark_config -def create_spark_shell_cmd(solid_config, main_class): - # Extract parameters from config - (master_url, deploy_mode, application_jar, spark_conf, application_arguments, spark_home) = [ - solid_config.get(k) - for k in ( - 'master_url', - 'deploy_mode', - 'application_jar', - 'spark_conf', - 'application_arguments', - 'spark_home', - ) - ] - - # Let the user use env vars in the jar path - application_jar = os.path.expandvars(application_jar) - - if not os.path.exists(application_jar): - raise SparkSolidError( - ( - 'Application jar {} does not exist. A valid jar must be ' - 'built before running this solid.'.format(application_jar) - ) - ) - - spark_home = spark_home if spark_home else os.environ.get('SPARK_HOME') - - if spark_home is None: - raise SparkSolidError( - ( - 'No spark home set. You must either pass spark_home in config or ' - 'set $SPARK_HOME in your environment (got None).' - ) - ) - - deploy_mode = ['--deploy-mode', '{}'.format(deploy_mode)] if deploy_mode else [] - - spark_shell_cmd = ( - ['{}/bin/spark-submit'.format(spark_home), '--class', main_class, '--master', master_url] - + deploy_mode - + parse_spark_config(spark_conf) - + [application_jar] - + ([application_arguments] if application_arguments else []) - ) - return spark_shell_cmd - - -def step_metadata_fn(environment_config, solid_name, main_class): - return { - 'spark_submit_command': ' '.join( - create_spark_shell_cmd(environment_config.solids[solid_name].config, main_class) - ) - } - - -class SparkSolidDefinition(SolidDefinition): +def create_spark_solid(name, main_class, description=None): '''This solid is a generic representation of a parameterized Spark job. Parameters: @@ -72,41 +13,31 @@ main_class (str): The entry point for your application (e.g. org.apache.spark.examples.SparkPi) ''' + name = check.str_param(name, 'name') + main_class = check.str_param(main_class, 'main_class') + description = check.opt_str_param( + description, + 'description', + 'This solid is a generic representation of a parameterized Spark job.', + ) - def __init__(self, name, main_class, description=None): - name = check.str_param(name, 'name') - main_class = check.str_param(main_class, 'main_class') - description = check.opt_str_param( - description, - 'description', - 'This solid is a generic representation of a parameterized Spark job.', - ) - - def _spark_compute_fn(context, _): - '''Define Spark execution. - - This function defines how we'll execute the Spark job and invokes spark-submit. - ''' - - spark_shell_cmd = create_spark_shell_cmd(context.solid_config, main_class) + @solid( + name=name, + description=description, + input_defs=[InputDefinition('spark_inputs', List[Path])], + output_defs=[OutputDefinition(dagster_type=List[Path], name='paths')], + metadata={'kind': 'spark', 'main_class': main_class}, + required_resource_keys={'spark'}, + ) + def spark_solid(context, spark_inputs): # pylint: disable=unused-argument + spark_shell_cmd = context.resources.spark.shell_cmd(main_class) - context.log.info("Running spark-submit: " + ' '.join(spark_shell_cmd)) - retcode = subprocess.call(' '.join(spark_shell_cmd), shell=True) + context.log.info('Running spark-submit: ' + ' '.join(spark_shell_cmd)) + retcode = subprocess.call(' '.join(spark_shell_cmd), shell=True) - if retcode != 0: - raise SparkSolidError('Spark job failed. Please consult your logs.') + if retcode != 0: + raise SparkSolidError('Spark job failed. Please consult your logs.') - yield Output(context.solid_config.get('spark_outputs'), 'paths') + yield Output(context.solid_config.get('spark_outputs'), 'paths') - super(SparkSolidDefinition, self).__init__( - name=name, - description=description, - input_defs=[InputDefinition('spark_inputs', List[Path])], - output_defs=[OutputDefinition(dagster_type=List[Path], name='paths')], - compute_fn=_spark_compute_fn, - config_field=define_spark_config(), - metadata={'kind': 'spark', 'main_class': main_class}, - step_metadata_fn=functools.partial( - step_metadata_fn, solid_name=name, main_class=main_class - ), - ) + return spark_solid diff --git a/python_modules/libraries/dagster-spark/dagster_spark_tests/test_error.py b/python_modules/libraries/dagster-spark/dagster_spark_tests/test_error.py --- a/python_modules/libraries/dagster-spark/dagster_spark_tests/test_error.py +++ b/python_modules/libraries/dagster-spark/dagster_spark_tests/test_error.py @@ -1,19 +1,15 @@ import os -import uuid import pytest import yaml -from dagster_spark import SparkSolidDefinition, SparkSolidError +from dagster_spark import SparkSolidError, create_spark_solid, spark_resource -from dagster import PipelineDefinition, execute_pipeline -from dagster.core.execution.api import create_execution_plan +from dagster import ModeDefinition, execute_pipeline, pipeline from dagster.utils import script_relative_path CONFIG_FILE = ''' -solids: - spark_solid: - inputs: - spark_inputs: [] +resources: + spark: config: spark_home: /your/spark_home spark_outputs: ["/tmp/dagster/events/data"] @@ -25,44 +21,34 @@ spark: app: name: "test_app" -''' +solids: + spark_solid: + inputs: + spark_inputs: [] +''' -def test_step_metadata(): - spark_solid = SparkSolidDefinition('spark_solid', main_class='something') - pipeline = PipelineDefinition(solid_defs=[spark_solid]) - environment_dict = yaml.load(CONFIG_FILE.format(path=script_relative_path('fake.jar'))) - execution_plan = create_execution_plan(pipeline, environment_dict) - step = execution_plan.get_step_by_key('spark_solid.compute') - assert step.metadata == { - 'spark_submit_command': ( - '/your/spark_home/bin/spark-submit --class something ' - '--master local[*] --deploy-mode client --conf spark.app.name=test_app ' - + script_relative_path('fake.jar') - + ' --local-path /tmp/dagster/events/data ' - '--date 2019-01-01' - ) - } +def test_jar_not_found(): + spark_solid = create_spark_solid('spark_solid', main_class='something') + @pipeline(mode_defs=[ModeDefinition(resource_defs={'spark': spark_resource})]) + def pipe(): + spark_solid() -def test_jar_not_found(): - spark_solid = SparkSolidDefinition('spark_solid', main_class='something') - pipeline = PipelineDefinition(solid_defs=[spark_solid]) # guid guaranteed to not exist - environment_dict = yaml.load(CONFIG_FILE.format(path=str(uuid.uuid4()))) + jar_path = os.path.join(os.path.dirname(__file__), 'fake.jar') + environment_dict = yaml.load(CONFIG_FILE.format(path=jar_path)) with pytest.raises( SparkSolidError, match='does not exist. A valid jar must be built before running this solid.', ): - execute_pipeline(pipeline, environment_dict) + execute_pipeline(pipe, environment_dict) NO_SPARK_HOME_CONFIG_FILE = ''' -solids: - spark_solid: - inputs: - spark_inputs: [] +resources: + spark: config: spark_outputs: ["/tmp/dagster/events/data"] application_jar: "{path}" @@ -73,6 +59,11 @@ spark: app: name: "test_app" + +solids: + spark_solid: + inputs: + spark_inputs: [] ''' @@ -80,12 +71,16 @@ if 'SPARK_HOME' in os.environ: del os.environ['SPARK_HOME'] - spark_solid = SparkSolidDefinition('spark_solid', main_class='something') - pipeline = PipelineDefinition(solid_defs=[spark_solid]) + spark_solid = create_spark_solid('spark_solid', main_class='something') + + @pipeline(mode_defs=[ModeDefinition(resource_defs={'spark': spark_resource})]) + def pipe(): + spark_solid() + environment_dict = yaml.load(NO_SPARK_HOME_CONFIG_FILE.format(path=script_relative_path('.'))) with pytest.raises(SparkSolidError) as exc_info: - execute_pipeline(pipeline, environment_dict) + execute_pipeline(pipe, environment_dict) assert str(exc_info.value) == ( 'No spark home set. You must either pass spark_home in config or set '