diff --git a/python_modules/libraries/dagster-aws/dagster_aws/s3/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/s3/__init__.py --- a/python_modules/libraries/dagster-aws/dagster_aws/s3/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/s3/__init__.py @@ -2,7 +2,7 @@ from .file_cache import S3FileCache, s3_file_cache from .file_manager import S3FileHandle, S3FileManager from .intermediate_storage import S3IntermediateStorage -from .io_manager import PickledObjectS3IOManager, s3_io_manager +from .io_manager import PickledObjectS3IOManager, s3_pickle_io_manager from .object_store import S3ObjectStore from .resources import s3_file_manager, s3_resource from .s3_fake_resource import S3FakeSession, create_s3_fake_resource diff --git a/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py b/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py --- a/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py @@ -82,12 +82,11 @@ }, required_resource_keys={"s3"}, ) -def s3_io_manager(init_context): +def s3_pickle_io_manager(init_context): """Persistent IO manager using S3 for storage. - Suitable for objects storage for distributed executors, so long as - each execution node has network connectivity and credentials for S3 and - the backing bucket. + Serializes objects via pickling. Suitable for objects storage for distributed executors, so long + as each execution node has network connectivity and credentials for S3 and the backing bucket. Attach this resource definition to a :py:class:`~dagster.ModeDefinition` in order to make it available to your pipeline: @@ -97,7 +96,7 @@ pipeline_def = PipelineDefinition( mode_defs=[ ModeDefinition( - resource_defs={'io_manager': s3_io_manager, "s3": s3_resource, ...}, + resource_defs={'io_manager': s3_pickle_io_manager, "s3": s3_resource, ...}, ), ... ], ... ) diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py @@ -15,7 +15,7 @@ from dagster.core.execution.api import create_execution_plan, execute_plan from dagster.core.execution.plan.outputs import StepOutputHandle from dagster.core.utils import make_new_run_id -from dagster_aws.s3.io_manager import PickledObjectS3IOManager, s3_io_manager +from dagster_aws.s3.io_manager import PickledObjectS3IOManager, s3_pickle_io_manager from dagster_aws.s3.utils import construct_s3_client @@ -48,7 +48,9 @@ @pipeline( mode_defs=[ - ModeDefinition(resource_defs={"io_manager": s3_io_manager, "s3": test_s3_resource,},) + ModeDefinition( + resource_defs={"io_manager": s3_pickle_io_manager, "s3": test_s3_resource,}, + ) ] ) def basic_external_plan_execution(): @@ -57,7 +59,7 @@ return basic_external_plan_execution -def test_s3_io_manager_execution(mock_s3_bucket): +def test_s3_pickle_io_manager_execution(mock_s3_bucket): pipeline_def = define_inty_pipeline() run_config = {"resources": {"io_manager": {"config": {"s3_bucket": mock_s3_bucket.name}}}} diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py --- a/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/__init__.py @@ -2,7 +2,7 @@ from .file_cache import ADLS2FileCache, adls2_file_cache from .file_manager import ADLS2FileHandle, ADLS2FileManager from .intermediate_storage import ADLS2IntermediateStorage -from .io_manager import PickledObjectADLS2IOManager, adls2_io_manager +from .io_manager import PickledObjectADLS2IOManager, adls2_pickle_io_manager from .object_store import ADLS2ObjectStore from .resources import adls2_file_manager, adls2_resource from .system_storage import adls2_intermediate_storage, adls2_plus_default_intermediate_storage_defs diff --git a/python_modules/libraries/dagster-azure/dagster_azure/adls2/io_manager.py b/python_modules/libraries/dagster-azure/dagster_azure/adls2/io_manager.py --- a/python_modules/libraries/dagster-azure/dagster_azure/adls2/io_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure/adls2/io_manager.py @@ -82,12 +82,12 @@ }, required_resource_keys={"adls2"}, ) -def adls2_io_manager(init_context): +def adls2_pickle_io_manager(init_context): """Persistent IO manager using Azure Data Lake Storage Gen2 for storage. - Suitable for objects storage for distributed executors, so long as - each execution node has network connectivity and credentials for ADLS and - the backing container. + Serializes objects via pickling. Suitable for objects storage for distributed executors, so long + as each execution node has network connectivity and credentials for ADLS and the backing + container. Attach this resource definition to a :py:class:`~dagster.ModeDefinition` in order to make it available to your pipeline: @@ -98,7 +98,7 @@ mode_defs=[ ModeDefinition( resource_defs={ - 'io_manager': adls2_io_manager, + 'io_manager': adls2_pickle_io_manager, 'adls2': adls2_resource, ...}, ), ... ], ... diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_io_manager.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_io_manager.py --- a/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_io_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/adls2_tests/test_io_manager.py @@ -17,7 +17,7 @@ from dagster.core.execution.plan.outputs import StepOutputHandle from dagster.core.utils import make_new_run_id from dagster_azure.adls2 import create_adls2_client -from dagster_azure.adls2.io_manager import PickledObjectADLS2IOManager, adls2_io_manager +from dagster_azure.adls2.io_manager import PickledObjectADLS2IOManager, adls2_pickle_io_manager from dagster_azure.adls2.resources import adls2_resource from dagster_azure.blob import create_blob_client @@ -55,7 +55,9 @@ @pipeline( mode_defs=[ - ModeDefinition(resource_defs={"io_manager": adls2_io_manager, "adls2": adls2_resource}) + ModeDefinition( + resource_defs={"io_manager": adls2_pickle_io_manager, "adls2": adls2_resource} + ) ] ) def basic_external_plan_execution(): @@ -68,7 +70,7 @@ @nettest -def test_adls2_io_manager_execution(storage_account, file_system, credential): +def test_adls2_pickle_io_manager_execution(storage_account, file_system, credential): pipeline_def = define_inty_pipeline() run_config = { diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/__init__.py b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/__init__.py --- a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/__init__.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/__init__.py @@ -1,4 +1,4 @@ from .compute_log_manager import GCSComputeLogManager -from .io_manager import PickledObjectGCSIOManager, gcs_io_manager +from .io_manager import PickledObjectGCSIOManager, gcs_pickle_io_manager from .resources import gcs_resource from .system_storage import gcs_intermediate_storage, gcs_plus_default_intermediate_storage_defs diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py --- a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py @@ -70,12 +70,11 @@ }, required_resource_keys={"gcs"}, ) -def gcs_io_manager(init_context): +def gcs_pickle_io_manager(init_context): """Persistent IO manager using GCS for storage. - Suitable for objects storage for distributed executors, so long as - each execution node has network connectivity and credentials for GCS and - the backing bucket. + Serializes objects via pickling. Suitable for objects storage for distributed executors, so long + as each execution node has network connectivity and credentials for GCS and the backing bucket. Attach this resource definition to a :py:class:`~dagster.ModeDefinition` in order to make it available to your pipeline: @@ -85,7 +84,7 @@ pipeline_def = PipelineDefinition( mode_defs=[ ModeDefinition( - resource_defs={'io_manager': gcs_io_manager, 'gcs': gcs_resource, ...}, + resource_defs={'io_manager': gcs_pickle_io_manager, 'gcs': gcs_resource, ...}, ), ... ], ... ) diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_io_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_io_manager.py --- a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_io_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_io_manager.py @@ -14,7 +14,7 @@ from dagster.core.execution.api import create_execution_plan, execute_plan from dagster.core.execution.plan.outputs import StepOutputHandle from dagster.core.utils import make_new_run_id -from dagster_gcp.gcs.io_manager import PickledObjectGCSIOManager, gcs_io_manager +from dagster_gcp.gcs.io_manager import PickledObjectGCSIOManager, gcs_pickle_io_manager from dagster_gcp.gcs.resources import gcs_resource from google.cloud import storage @@ -44,7 +44,9 @@ @pipeline( mode_defs=[ - ModeDefinition(resource_defs={"io_manager": gcs_io_manager, "gcs": gcs_resource},) + ModeDefinition( + resource_defs={"io_manager": gcs_pickle_io_manager, "gcs": gcs_resource}, + ) ] ) def basic_external_plan_execution(): @@ -53,7 +55,7 @@ return basic_external_plan_execution -def test_gcs_io_manager_execution(gcs_bucket): +def test_gcs_pickle_io_manager_execution(gcs_bucket): pipeline_def = define_inty_pipeline() run_config = {"resources": {"io_manager": {"config": {"gcs_bucket": gcs_bucket,}}}}