diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/conftest.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/conftest.py index 4dac561ef..dd52d73ae 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/conftest.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/conftest.py @@ -1,6 +1,14 @@ +import boto3 import pytest +from moto import mock_s3 -@pytest.fixture(scope="session") -def s3_bucket(): - yield "dagster-scratch-80542c2" +@pytest.fixture +def s3(): + with mock_s3(): + yield boto3.resource("s3") + + +@pytest.fixture +def bucket(s3): # pylint: disable=redefined-outer-name + yield s3.create_bucket(Bucket="test-bucket") diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py index ddec4f494..e51d4f413 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py @@ -1,117 +1,110 @@ import os import sys -import boto3 import six from dagster import DagsterEventType, execute_pipeline, pipeline, seven, solid from dagster.core.instance import DagsterInstance, InstanceType from dagster.core.launcher import DefaultRunLauncher from dagster.core.run_coordinator import DefaultRunCoordinator from dagster.core.storage.compute_log_manager import ComputeIOType from dagster.core.storage.event_log import SqliteEventLogStorage from dagster.core.storage.root import LocalArtifactStorage from dagster.core.storage.runs import SqliteRunStorage from dagster_aws.s3 import S3ComputeLogManager -from moto import mock_s3 HELLO_WORLD = "Hello World" SEPARATOR = os.linesep if (os.name == "nt" and sys.version_info < (3,)) else "\n" EXPECTED_LOGS = [ 'STEP_START - Started execution of step "easy.compute".', 'STEP_OUTPUT - Yielded output "result" of type "Any"', 'STEP_SUCCESS - Finished execution of step "easy.compute"', ] -@mock_s3 -def test_compute_log_manager(s3_bucket): +def test_compute_log_manager(bucket): @pipeline def simple(): @solid def easy(context): context.log.info("easy") print(HELLO_WORLD) # pylint: disable=print-call return "easy" easy() - # Uses mock S3 - s3 = boto3.client("s3") - s3.create_bucket(Bucket=s3_bucket) - with seven.TemporaryDirectory() as temp_dir: run_store = SqliteRunStorage.from_local(temp_dir) event_store = SqliteEventLogStorage(temp_dir) - manager = S3ComputeLogManager(bucket=s3_bucket, prefix="my_prefix", local_dir=temp_dir) + manager = S3ComputeLogManager(bucket=bucket.name, prefix="my_prefix", local_dir=temp_dir) instance = DagsterInstance( instance_type=InstanceType.PERSISTENT, local_artifact_storage=LocalArtifactStorage(temp_dir), run_storage=run_store, event_storage=event_store, compute_log_manager=manager, run_coordinator=DefaultRunCoordinator(), run_launcher=DefaultRunLauncher(), ) result = execute_pipeline(simple, instance=instance) compute_steps = [ event.step_key for event in result.step_event_list if event.event_type == DagsterEventType.STEP_START ] assert len(compute_steps) == 1 step_key = compute_steps[0] stdout = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDOUT) assert stdout.data == HELLO_WORLD + SEPARATOR stderr = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDERR) for expected in EXPECTED_LOGS: assert expected in stderr.data # Check S3 directly - s3_object = s3.get_object( - Bucket=s3_bucket, - Key="{prefix}/storage/{run_id}/compute_logs/easy.compute.err".format( + s3_object = bucket.Object( + key="{prefix}/storage/{run_id}/compute_logs/easy.compute.err".format( prefix="my_prefix", run_id=result.run_id ), ) - stderr_s3 = six.ensure_str(s3_object["Body"].read()) + stderr_s3 = six.ensure_str(s3_object.get()["Body"].read()) for expected in EXPECTED_LOGS: assert expected in stderr_s3 # Check download behavior by deleting locally cached logs compute_logs_dir = os.path.join(temp_dir, result.run_id, "compute_logs") for filename in os.listdir(compute_logs_dir): os.unlink(os.path.join(compute_logs_dir, filename)) stdout = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDOUT) assert stdout.data == HELLO_WORLD + SEPARATOR stderr = manager.read_logs_file(result.run_id, step_key, ComputeIOType.STDERR) for expected in EXPECTED_LOGS: assert expected in stderr.data -@mock_s3 -def test_compute_log_manager_from_config(s3_bucket): +def test_compute_log_manager_from_config(bucket): s3_prefix = "foobar" dagster_yaml = """ compute_logs: module: dagster_aws.s3.compute_log_manager class: S3ComputeLogManager config: bucket: "{s3_bucket}" local_dir: "/tmp/cool" prefix: "{s3_prefix}" """.format( - s3_bucket=s3_bucket, s3_prefix=s3_prefix + s3_bucket=bucket.name, s3_prefix=s3_prefix ) with seven.TemporaryDirectory() as tempdir: with open(os.path.join(tempdir, "dagster.yaml"), "wb") as f: f.write(six.ensure_binary(dagster_yaml)) instance = DagsterInstance.from_config(tempdir) - assert instance.compute_log_manager._s3_bucket == s3_bucket # pylint: disable=protected-access + assert ( + instance.compute_log_manager._s3_bucket == bucket.name # pylint: disable=protected-access + ) assert instance.compute_log_manager._s3_prefix == s3_prefix # pylint: disable=protected-access diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_file_handle_to_s3.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_file_handle_to_s3.py index ed809fd63..40c2fec57 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_file_handle_to_s3.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_file_handle_to_s3.py @@ -1,53 +1,42 @@ -import boto3 -from dagster import ModeDefinition, ResourceDefinition, execute_pipeline, pipeline, solid +from dagster import ModeDefinition, execute_pipeline, pipeline, solid from dagster.utils.test import get_temp_file_handle_with_data -from dagster_aws.s3 import file_handle_to_s3 -from moto import mock_s3 +from dagster_aws.s3 import file_handle_to_s3, s3_resource -def create_file_handle_pipeline(temp_file_handle, s3_resource): +def create_file_handle_pipeline(temp_file_handle): @solid def emit_temp_handle(_): return temp_file_handle - @pipeline( - mode_defs=[ - ModeDefinition(resource_defs={"s3": ResourceDefinition.hardcoded_resource(s3_resource)}) - ] - ) + @pipeline(mode_defs=[ModeDefinition(resource_defs={"s3": s3_resource})]) def test(): return file_handle_to_s3(emit_temp_handle()) return test -@mock_s3 -def test_successful_file_handle_to_s3(): +def test_successful_file_handle_to_s3(bucket): foo_bytes = "foo".encode() with get_temp_file_handle_with_data(foo_bytes) as temp_file_handle: - # Uses mock S3 - s3 = boto3.client("s3") - s3.create_bucket(Bucket="some-bucket") - result = execute_pipeline( - create_file_handle_pipeline(temp_file_handle, s3), + create_file_handle_pipeline(temp_file_handle), run_config={ "solids": { - "file_handle_to_s3": {"config": {"Bucket": "some-bucket", "Key": "some-key"}} + "file_handle_to_s3": {"config": {"Bucket": bucket.name, "Key": "some-key"}} } }, ) assert result.success - assert s3.get_object(Bucket="some-bucket", Key="some-key")["Body"].read() == foo_bytes + assert bucket.Object(key="some-key").get()["Body"].read() == foo_bytes materializations = result.result_for_solid( "file_handle_to_s3" ).materializations_during_compute assert len(materializations) == 1 assert len(materializations[0].metadata_entries) == 1 - assert ( - materializations[0].metadata_entries[0].entry_data.path == "s3://some-bucket/some-key" - ) + assert materializations[0].metadata_entries[ + 0 + ].entry_data.path == "s3://{bucket}/some-key".format(bucket=bucket.name) assert materializations[0].metadata_entries[0].label == "some-key" diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_intermediate_storage.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_intermediate_storage.py index d8773dd6d..ecad37137 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_intermediate_storage.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_intermediate_storage.py @@ -1,456 +1,443 @@ import csv import os from collections import OrderedDict import pytest from dagster import ( Bool, InputDefinition, Int, List, ModeDefinition, OutputDefinition, PipelineRun, SerializationStrategy, String, check, execute_pipeline, lambda_solid, pipeline, usable_as_dagster_type, ) from dagster.core.events import DagsterEventType from dagster.core.execution.api import create_execution_plan, execute_plan, scoped_pipeline_context from dagster.core.execution.plan.objects import StepOutputHandle from dagster.core.instance import DagsterInstance from dagster.core.storage.type_storage import TypeStoragePlugin, TypeStoragePluginRegistry from dagster.core.types.dagster_type import Bool as RuntimeBool from dagster.core.types.dagster_type import String as RuntimeString from dagster.core.types.dagster_type import create_any_type, resolve_dagster_type from dagster.core.utils import make_new_run_id from dagster.utils.test import yield_empty_pipeline_context from dagster_aws.s3 import ( S3IntermediateStorage, s3_plus_default_intermediate_storage_defs, s3_resource, ) class UppercaseSerializationStrategy(SerializationStrategy): # pylint: disable=no-init def serialize(self, value, write_file_obj): return write_file_obj.write(bytes(value.upper().encode("utf-8"))) def deserialize(self, read_file_obj): return read_file_obj.read().decode("utf-8").lower() LowercaseString = create_any_type( "LowercaseString", serialization_strategy=UppercaseSerializationStrategy("uppercase"), ) -def aws_credentials_present(): - return os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY") - - -nettest = pytest.mark.nettest - - def define_inty_pipeline(should_throw=True): @lambda_solid def return_one(): return 1 @lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) def add_one(num): return num + 1 @lambda_solid def user_throw_exception(): raise Exception("whoops") @pipeline( mode_defs=[ ModeDefinition( intermediate_storage_defs=s3_plus_default_intermediate_storage_defs, resource_defs={"s3": s3_resource}, ) ] ) def basic_external_plan_execution(): add_one(return_one()) if should_throw: user_throw_exception() return basic_external_plan_execution def get_step_output(step_events, step_key, output_name="result"): for step_event in step_events: if ( step_event.event_type == DagsterEventType.STEP_OUTPUT and step_event.step_key == step_key and step_event.step_output_data.output_name == output_name ): return step_event return None -@nettest -def test_using_s3_for_subplan(s3_bucket): +def test_using_s3_for_subplan(bucket): pipeline_def = define_inty_pipeline() - run_config = {"intermediate_storage": {"s3": {"config": {"s3_bucket": s3_bucket}}}} + run_config = {"intermediate_storage": {"s3": {"config": {"s3_bucket": bucket.name}}}} run_id = make_new_run_id() execution_plan = create_execution_plan(pipeline_def, run_config=run_config) assert execution_plan.get_step_by_key("return_one.compute") step_keys = ["return_one.compute"] instance = DagsterInstance.ephemeral() pipeline_run = PipelineRun( pipeline_name=pipeline_def.name, run_id=run_id, run_config=run_config ) return_one_step_events = list( execute_plan( execution_plan.build_subset_plan(step_keys), run_config=run_config, pipeline_run=pipeline_run, instance=instance, ) ) assert get_step_output(return_one_step_events, "return_one.compute") with scoped_pipeline_context( execution_plan.build_subset_plan(["return_one.compute"]), run_config, pipeline_run, instance, ) as context: intermediates_manager = S3IntermediateStorage( - s3_bucket, + bucket.name, run_id, s3_session=context.scoped_resources_builder.build(required_resource_keys={"s3"},).s3, ) step_output_handle = StepOutputHandle("return_one.compute") assert intermediates_manager.has_intermediate(context, step_output_handle) assert intermediates_manager.get_intermediate(context, Int, step_output_handle).obj == 1 add_one_step_events = list( execute_plan( execution_plan.build_subset_plan(["add_one.compute"]), run_config=run_config, pipeline_run=pipeline_run, instance=instance, ) ) assert get_step_output(add_one_step_events, "add_one.compute") with scoped_pipeline_context( execution_plan.build_subset_plan(["add_one.compute"]), run_config, pipeline_run, instance, ) as context: step_output_handle = StepOutputHandle("add_one.compute") assert intermediates_manager.has_intermediate(context, step_output_handle) assert intermediates_manager.get_intermediate(context, Int, step_output_handle).obj == 2 class FancyStringS3TypeStoragePlugin(TypeStoragePlugin): # pylint:disable=no-init @classmethod def compatible_with_storage_def(cls, _): # Not needed for these tests raise NotImplementedError() @classmethod def set_intermediate_object( cls, intermediate_storage, context, dagster_type, step_output_handle, value ): check.inst_param(intermediate_storage, "intermediate_storage", S3IntermediateStorage) paths = ["intermediates", step_output_handle.step_key, step_output_handle.output_name] paths.append(value) key = intermediate_storage.object_store.key_for_paths([intermediate_storage.root] + paths) return intermediate_storage.object_store.set_object( key, "", dagster_type.serialization_strategy ) @classmethod def get_intermediate_object( cls, intermediate_storage, context, dagster_type, step_output_handle ): check.inst_param(intermediate_storage, "intermediate_storage", S3IntermediateStorage) paths = ["intermediates", step_output_handle.step_key, step_output_handle.output_name] res = intermediate_storage.object_store.s3.list_objects( Bucket=intermediate_storage.object_store.bucket, Prefix=intermediate_storage.key_for_paths(paths), ) return res["Contents"][0]["Key"].split("/")[-1] -@nettest -def test_s3_intermediate_storage_with_type_storage_plugin(s3_bucket): +def test_s3_intermediate_storage_with_type_storage_plugin(bucket): run_id = make_new_run_id() intermediate_storage = S3IntermediateStorage( run_id=run_id, - s3_bucket=s3_bucket, + s3_bucket=bucket.name, type_storage_plugin_registry=TypeStoragePluginRegistry( [(RuntimeString, FancyStringS3TypeStoragePlugin)] ), ) with yield_empty_pipeline_context(run_id=run_id) as context: try: intermediate_storage.set_intermediate( context, RuntimeString, StepOutputHandle("obj_name"), "hello" ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("obj_name")) assert ( intermediate_storage.get_intermediate( context, RuntimeString, StepOutputHandle("obj_name") ) == "hello" ) finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("obj_name")) -@nettest -def test_s3_intermediate_storage_with_composite_type_storage_plugin(s3_bucket): +def test_s3_intermediate_storage_with_composite_type_storage_plugin(bucket): run_id = make_new_run_id() intermediate_storage = S3IntermediateStorage( run_id=run_id, - s3_bucket=s3_bucket, + s3_bucket=bucket.name, type_storage_plugin_registry=TypeStoragePluginRegistry( [(RuntimeString, FancyStringS3TypeStoragePlugin)] ), ) with yield_empty_pipeline_context(run_id=run_id) as context: with pytest.raises(check.NotImplementedCheckError): intermediate_storage.set_intermediate( context, resolve_dagster_type(List[String]), StepOutputHandle("obj_name"), ["hello",], ) -@nettest -def test_s3_intermediate_storage_composite_types_with_custom_serializer_for_inner_type(s3_bucket): +def test_s3_intermediate_storage_composite_types_with_custom_serializer_for_inner_type(bucket): run_id = make_new_run_id() - intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=s3_bucket) + intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=bucket.name) with yield_empty_pipeline_context(run_id=run_id) as context: try: intermediate_storage.set_intermediate( context, resolve_dagster_type(List[LowercaseString]), StepOutputHandle("list"), ["foo", "bar"], ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("list")) assert intermediate_storage.get_intermediate( context, resolve_dagster_type(List[Bool]), StepOutputHandle("list") ).obj == ["foo", "bar"] finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("list")) -@nettest -def test_s3_intermediate_storage_with_custom_serializer(s3_bucket): +def test_s3_intermediate_storage_with_custom_serializer(bucket): run_id = make_new_run_id() - intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=s3_bucket) + intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=bucket.name) with yield_empty_pipeline_context(run_id=run_id) as context: try: intermediate_storage.set_intermediate( context, LowercaseString, StepOutputHandle("foo"), "foo" ) assert ( intermediate_storage.object_store.s3.get_object( Bucket=intermediate_storage.object_store.bucket, Key=os.path.join(intermediate_storage.root, "intermediates", "foo", "result"), )["Body"] .read() .decode("utf-8") == "FOO" ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("foo")) assert ( intermediate_storage.get_intermediate( context, LowercaseString, StepOutputHandle("foo") ).obj == "foo" ) finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("foo")) -@nettest -def test_s3_pipeline_with_custom_prefix(s3_bucket): +def test_s3_pipeline_with_custom_prefix(bucket): s3_prefix = "custom_prefix" pipe = define_inty_pipeline(should_throw=False) run_config = { - "intermediate_storage": {"s3": {"config": {"s3_bucket": s3_bucket, "s3_prefix": s3_prefix}}} + "intermediate_storage": { + "s3": {"config": {"s3_bucket": bucket.name, "s3_prefix": s3_prefix}} + } } pipeline_run = PipelineRun(pipeline_name=pipe.name, run_config=run_config) instance = DagsterInstance.ephemeral() result = execute_pipeline(pipe, run_config=run_config,) assert result.success execution_plan = create_execution_plan(pipe, run_config) with scoped_pipeline_context(execution_plan, run_config, pipeline_run, instance,) as context: intermediates_manager = S3IntermediateStorage( run_id=result.run_id, - s3_bucket=s3_bucket, + s3_bucket=bucket.name, s3_prefix=s3_prefix, s3_session=context.scoped_resources_builder.build(required_resource_keys={"s3"}).s3, ) assert intermediates_manager.root == "/".join(["custom_prefix", "storage", result.run_id]) assert ( intermediates_manager.get_intermediate( context, Int, StepOutputHandle("return_one.compute") ).obj == 1 ) assert ( intermediates_manager.get_intermediate( context, Int, StepOutputHandle("add_one.compute") ).obj == 2 ) -@nettest -def test_s3_intermediate_storage_with_custom_prefix(s3_bucket): +def test_s3_intermediate_storage_with_custom_prefix(bucket): run_id = make_new_run_id() intermediate_storage = S3IntermediateStorage( - run_id=run_id, s3_bucket=s3_bucket, s3_prefix="custom_prefix" + run_id=run_id, s3_bucket=bucket.name, s3_prefix="custom_prefix" ) assert intermediate_storage.root == "/".join(["custom_prefix", "storage", run_id]) try: with yield_empty_pipeline_context(run_id=run_id) as context: intermediate_storage.set_intermediate( context, RuntimeBool, StepOutputHandle("true"), True ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("true")) assert intermediate_storage.uri_for_paths(["true"]).startswith( - "s3://%s/custom_prefix" % s3_bucket + "s3://%s/custom_prefix" % bucket.name ) finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("true")) -@nettest -def test_s3_intermediate_storage(s3_bucket): +def test_s3_intermediate_storage(bucket): run_id = make_new_run_id() run_id_2 = make_new_run_id() - intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=s3_bucket) + intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=bucket.name) assert intermediate_storage.root == "/".join(["dagster", "storage", run_id]) - intermediate_storage_2 = S3IntermediateStorage(run_id=run_id_2, s3_bucket=s3_bucket) + intermediate_storage_2 = S3IntermediateStorage(run_id=run_id_2, s3_bucket=bucket.name) assert intermediate_storage_2.root == "/".join(["dagster", "storage", run_id_2]) try: with yield_empty_pipeline_context(run_id=run_id) as context: intermediate_storage.set_intermediate( context, RuntimeBool, StepOutputHandle("true"), True ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("true")) assert ( intermediate_storage.get_intermediate( context, RuntimeBool, StepOutputHandle("true") ).obj is True ) assert intermediate_storage.uri_for_paths(["true"]).startswith("s3://") intermediate_storage_2.copy_intermediate_from_run( context, run_id, StepOutputHandle("true") ) assert intermediate_storage_2.has_intermediate(context, StepOutputHandle("true")) assert ( intermediate_storage_2.get_intermediate( context, RuntimeBool, StepOutputHandle("true") ).obj is True ) finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("true")) intermediate_storage_2.rm_intermediate(context, StepOutputHandle("true")) class CsvSerializationStrategy(SerializationStrategy): def __init__(self): super(CsvSerializationStrategy, self).__init__( "csv_strategy", read_mode="r", write_mode="w" ) def serialize(self, value, write_file_obj): fieldnames = value[0] writer = csv.DictWriter(write_file_obj, fieldnames) writer.writeheader() writer.writerows(value) def deserialize(self, read_file_obj): reader = csv.DictReader(read_file_obj) return LessSimpleDataFrame([row for row in reader]) @usable_as_dagster_type( name="LessSimpleDataFrame", description=("A naive representation of a data frame, e.g., as returned by " "csv.DictReader."), serialization_strategy=CsvSerializationStrategy(), ) class LessSimpleDataFrame(list): pass -def test_custom_read_write_mode(s3_bucket): +def test_custom_read_write_mode(bucket): run_id = make_new_run_id() - intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=s3_bucket) + intermediate_storage = S3IntermediateStorage(run_id=run_id, s3_bucket=bucket.name) data_frame = [OrderedDict({"foo": "1", "bar": "1"}), OrderedDict({"foo": "2", "bar": "2"})] try: with yield_empty_pipeline_context(run_id=run_id) as context: intermediate_storage.set_intermediate( context, resolve_dagster_type(LessSimpleDataFrame), StepOutputHandle("data_frame"), data_frame, ) assert intermediate_storage.has_intermediate(context, StepOutputHandle("data_frame")) assert ( intermediate_storage.get_intermediate( context, resolve_dagster_type(LessSimpleDataFrame), StepOutputHandle("data_frame"), ).obj == data_frame ) assert intermediate_storage.uri_for_paths(["data_frame"]).startswith("s3://") finally: intermediate_storage.rm_intermediate(context, StepOutputHandle("data_frame")) diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_object_store.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_object_store.py index 3a5221a1b..2c64ee043 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_object_store.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_object_store.py @@ -1,33 +1,26 @@ -import boto3 from dagster.core.storage.object_store import DEFAULT_SERIALIZATION_STRATEGY from dagster_aws.s3 import S3ObjectStore -from moto import mock_s3 -@mock_s3 -def test_s3_object_store(s3_bucket, caplog): - # Uses mock S3 - s3 = boto3.client("s3") - s3.create_bucket(Bucket=s3_bucket) - +def test_s3_object_store(bucket, caplog): key = "foo" - s3_obj_store = S3ObjectStore(s3_bucket) + s3_obj_store = S3ObjectStore(bucket.name) res_key = s3_obj_store.set_object(key, True, DEFAULT_SERIALIZATION_STRATEGY) - assert res_key == "s3://{s3_bucket}/{key}".format(s3_bucket=s3_bucket, key=key) + assert res_key == "s3://{s3_bucket}/{key}".format(s3_bucket=bucket.name, key=key) s3_obj_store.set_object(key, True, DEFAULT_SERIALIZATION_STRATEGY) assert "Removing existing S3 key" in caplog.text assert s3_obj_store.has_object(key) assert s3_obj_store.get_object(key, DEFAULT_SERIALIZATION_STRATEGY)[0] == True s3_obj_store.cp_object(key, "bar") assert s3_obj_store.has_object("bar") s3_obj_store.rm_object(key) assert not s3_obj_store.has_object(key) assert s3_obj_store.uri_for_key(key) == "s3://{s3_bucket}/{key}".format( - s3_bucket=s3_bucket, key=key + s3_bucket=bucket.name, key=key ) diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_s3_file_cache.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_s3_file_cache.py index f272e40bb..a5135cc9d 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_s3_file_cache.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_s3_file_cache.py @@ -1,54 +1,40 @@ import io -import boto3 from dagster_aws.s3 import S3FileCache, S3FileHandle -from moto import mock_s3 -@mock_s3 -def test_s3_file_cache_file_not_present(): - s3 = boto3.client("s3") - s3.create_bucket(Bucket="some-bucket") +def test_s3_file_cache_file_not_present(s3, bucket): file_store = S3FileCache( - s3_bucket="some-bucket", s3_key="some-key", s3_session=s3, overwrite=False + s3_bucket=bucket.name, s3_key="some-key", s3_session=s3.meta.client, overwrite=False ) assert not file_store.has_file_object("foo") -@mock_s3 -def test_s3_file_cache_file_present(): - s3 = boto3.client("s3") - s3.create_bucket(Bucket="some-bucket") +def test_s3_file_cache_file_present(s3, bucket): file_store = S3FileCache( - s3_bucket="some-bucket", s3_key="some-key", s3_session=s3, overwrite=False + s3_bucket=bucket.name, s3_key="some-key", s3_session=s3.meta.client, overwrite=False ) assert not file_store.has_file_object("foo") file_store.write_binary_data("foo", "bar".encode()) assert file_store.has_file_object("foo") -@mock_s3 -def test_s3_file_cache_correct_handle(): - s3 = boto3.client("s3") - s3.create_bucket(Bucket="some-bucket") +def test_s3_file_cache_correct_handle(s3, bucket): file_store = S3FileCache( - s3_bucket="some-bucket", s3_key="some-key", s3_session=s3, overwrite=False + s3_bucket=bucket.name, s3_key="some-key", s3_session=s3.meta.client, overwrite=False ) assert isinstance(file_store.get_file_handle("foo"), S3FileHandle) -@mock_s3 -def test_s3_file_cache_write_file_object(): - s3 = boto3.client("s3") - s3.create_bucket(Bucket="some-bucket") +def test_s3_file_cache_write_file_object(s3, bucket): file_store = S3FileCache( - s3_bucket="some-bucket", s3_key="some-key", s3_session=s3, overwrite=False + s3_bucket=bucket.name, s3_key="some-key", s3_session=s3.meta.client, overwrite=False ) stream = io.BytesIO("content".encode()) file_store.write_file_object("foo", stream)