diff --git a/python_modules/dagster-graphql/dagster_graphql/test/utils.py b/python_modules/dagster-graphql/dagster_graphql/test/utils.py --- a/python_modules/dagster-graphql/dagster_graphql/test/utils.py +++ b/python_modules/dagster-graphql/dagster_graphql/test/utils.py @@ -2,7 +2,7 @@ from dagster import check from dagster.cli.workspace import Workspace -from dagster.core.code_pointer import CodePointer +from dagster.core.definitions.reconstructable import ReconstructableRepository from dagster.core.host_representation import ( InProcessRepositoryLocationOrigin, PythonEnvRepositoryLocationOrigin, @@ -56,7 +56,7 @@ workspace=Workspace( [ InProcessRepositoryLocationOrigin( - CodePointer.from_python_file(python_file, fn_name, None) + ReconstructableRepository.for_file(python_file, fn_name) ) ] ), diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/graphql_context_test_suite.py @@ -265,7 +265,7 @@ @contextmanager def _mgr_fn(recon_repo): check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) - with Workspace([InProcessRepositoryLocationOrigin(recon_repo.pointer)]) as workspace: + with Workspace([InProcessRepositoryLocationOrigin(recon_repo)]) as workspace: yield workspace return MarkedManager(_mgr_fn, [Marks.hosted_user_process_env]) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_cancellation.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_cancellation.py --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_cancellation.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_run_cancellation.py @@ -2,7 +2,7 @@ import time from dagster import execute_pipeline -from dagster.core.host_representation.handle import IN_PROCESS_NAME +from dagster.core.host_representation import IN_PROCESS_NAME from dagster.utils import safe_tempfile_path from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION from dagster_graphql.test.utils import execute_dagster_graphql, infer_pipeline_selector diff --git a/python_modules/dagster-test/dagster_test/dagster_core_docker_buildkite/__init__.py b/python_modules/dagster-test/dagster_test/dagster_core_docker_buildkite/__init__.py --- a/python_modules/dagster-test/dagster_test/dagster_core_docker_buildkite/__init__.py +++ b/python_modules/dagster-test/dagster_test/dagster_core_docker_buildkite/__init__.py @@ -4,7 +4,11 @@ from dagster import check from dagster.core.definitions.reconstructable import ReconstructableRepository -from dagster.core.host_representation import InProcessRepositoryLocation +from dagster.core.host_representation import ( + InProcessRepositoryLocationOrigin, + RepositoryLocation, + RepositoryLocationHandle, +) from dagster.utils import file_relative_path, git_repository_root IS_BUILDKITE = os.getenv("BUILDKITE") is not None @@ -39,10 +43,14 @@ def get_test_project_external_pipeline(pipeline_name): return ( - InProcessRepositoryLocation( - ReconstructableRepository.for_file( - file_relative_path(__file__, "test_pipelines/repo.py"), - "define_demo_execution_repo", + RepositoryLocation.from_handle( + RepositoryLocationHandle.create_from_repository_location_origin( + InProcessRepositoryLocationOrigin( + ReconstructableRepository.for_file( + file_relative_path(__file__, "test_pipelines/repo.py"), + "define_demo_execution_repo", + ) + ) ) ) .get_repository("demo_execution_repo") @@ -80,5 +88,5 @@ image_name=image_name, tag=docker_image_tag, ) - print("Using Docker image: %s" % final_docker_image) + print("Using Docker image: %s" % final_docker_image) # pylint: disable=print-call return final_docker_image diff --git a/python_modules/dagster-test/dagster_test/test_project/__init__.py b/python_modules/dagster-test/dagster_test/test_project/__init__.py --- a/python_modules/dagster-test/dagster_test/test_project/__init__.py +++ b/python_modules/dagster-test/dagster_test/test_project/__init__.py @@ -8,7 +8,12 @@ ReconstructablePipeline, ReconstructableRepository, ) -from dagster.core.host_representation import ExternalPipeline, InProcessRepositoryLocation +from dagster.core.host_representation import ( + ExternalPipeline, + InProcessRepositoryLocationOrigin, + RepositoryLocation, + RepositoryLocationHandle, +) from dagster.core.origin import PipelinePythonOrigin, RepositoryPythonOrigin from dagster.serdes import whitelist_for_serdes from dagster.utils import file_relative_path, git_repository_root @@ -105,10 +110,14 @@ def get_test_project_external_pipeline(pipeline_name): return ( - InProcessRepositoryLocation( - ReconstructableRepository.for_file( - file_relative_path(__file__, "test_pipelines/repo.py"), - "define_demo_execution_repo", + RepositoryLocation.from_handle( + RepositoryLocationHandle.create_from_repository_location_origin( + InProcessRepositoryLocationOrigin( + ReconstructableRepository.for_file( + file_relative_path(__file__, "test_pipelines/repo.py"), + "define_demo_execution_repo", + ) + ) ) ) .get_repository("demo_execution_repo") diff --git a/python_modules/dagster/dagster/cli/workspace/load.py b/python_modules/dagster/dagster/cli/workspace/load.py --- a/python_modules/dagster/dagster/cli/workspace/load.py +++ b/python_modules/dagster/dagster/cli/workspace/load.py @@ -4,7 +4,8 @@ import six from dagster import check -from dagster.core.code_pointer import CodePointer, rebase_file +from dagster.core.code_pointer import rebase_file +from dagster.core.definitions.reconstructable import ReconstructableRepository from dagster.core.host_representation import ( GrpcServerRepositoryLocationOrigin, InProcessRepositoryLocationOrigin, @@ -66,7 +67,7 @@ ) origin = InProcessRepositoryLocationOrigin( - CodePointer.from_legacy_repository_yaml(yaml_path) + ReconstructableRepository.from_legacy_repository_yaml(yaml_path) ) return {origin.location_name: origin} diff --git a/python_modules/dagster/dagster/core/host_representation/__init__.py b/python_modules/dagster/dagster/core/host_representation/__init__.py --- a/python_modules/dagster/dagster/core/host_representation/__init__.py +++ b/python_modules/dagster/dagster/core/host_representation/__init__.py @@ -44,6 +44,7 @@ ) from .historical import HistoricalPipeline from .origin import ( + IN_PROCESS_NAME, GrpcServerRepositoryLocationOrigin, InProcessRepositoryLocationOrigin, ManagedGrpcPythonEnvRepositoryLocationOrigin, diff --git a/python_modules/dagster/dagster/core/host_representation/handle.py b/python_modules/dagster/dagster/core/host_representation/handle.py --- a/python_modules/dagster/dagster/core/host_representation/handle.py +++ b/python_modules/dagster/dagster/core/host_representation/handle.py @@ -8,9 +8,9 @@ from dagster import check from dagster.api.list_repositories import sync_list_repositories, sync_list_repositories_grpc from dagster.core.code_pointer import CodePointer +from dagster.core.definitions.reconstructable import repository_def_from_pointer from dagster.core.errors import DagsterInvariantViolationError from dagster.core.host_representation.origin import ( - IN_PROCESS_NAME, GrpcServerRepositoryLocationOrigin, InProcessRepositoryLocationOrigin, ManagedGrpcPythonEnvRepositoryLocationOrigin, @@ -60,17 +60,13 @@ def create_from_repository_location_origin(repo_location_origin): check.inst_param(repo_location_origin, "repo_location_origin", RepositoryLocationOrigin) if isinstance(repo_location_origin, PythonEnvRepositoryLocationOrigin): - return RepositoryLocationHandle.create_python_env_location(repo_location_origin) + return PythonEnvRepositoryLocationHandle(repo_location_origin) elif isinstance(repo_location_origin, ManagedGrpcPythonEnvRepositoryLocationOrigin): - return RepositoryLocationHandle.create_process_bound_grpc_server_location( - repo_location_origin - ) + return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin) elif isinstance(repo_location_origin, GrpcServerRepositoryLocationOrigin): - return RepositoryLocationHandle.create_grpc_server_location(repo_location_origin) + return GrpcServerRepositoryLocationHandle(repo_location_origin) elif isinstance(repo_location_origin, InProcessRepositoryLocationOrigin): - return RepositoryLocationHandle.create_in_process_location( - repo_location_origin.code_pointer - ) + return InProcessRepositoryLocationHandle(repo_location_origin) else: check.failed("Unexpected repository location origin") @@ -103,135 +99,27 @@ else: raise DagsterInvariantViolationError("Unexpected repository origin type") - @staticmethod - def create_in_process_location(pointer): - check.inst_param(pointer, "pointer", CodePointer) - - # If we are here we know we are in a hosted_user_process so we can do this - from dagster.core.definitions.reconstructable import repository_def_from_pointer - - repo_def = repository_def_from_pointer(pointer) - return InProcessRepositoryLocationHandle(IN_PROCESS_NAME, {repo_def.name: pointer}) - - @staticmethod - def create_python_env_location(origin): - check.inst_param(origin, "origin", PythonEnvRepositoryLocationOrigin) - - loadable_target_origin = origin.loadable_target_origin - - response = sync_list_repositories( - executable_path=loadable_target_origin.executable_path, - python_file=loadable_target_origin.python_file, - module_name=loadable_target_origin.package_name - if loadable_target_origin.package_name - else loadable_target_origin.module_name, - working_directory=loadable_target_origin.working_directory, - attribute=loadable_target_origin.attribute, - ) - - if loadable_target_origin.python_file: - repository_code_pointer_dict = { - lrs.repository_name: CodePointer.from_python_file( - loadable_target_origin.python_file, - lrs.attribute, - loadable_target_origin.working_directory, - ) - for lrs in response.repository_symbols - } - elif loadable_target_origin.package_name: - repository_code_pointer_dict = { - lrs.repository_name: CodePointer.from_python_package( - loadable_target_origin.package_name, lrs.attribute - ) - for lrs in response.repository_symbols - } - else: - repository_code_pointer_dict = { - lrs.repository_name: CodePointer.from_module( - loadable_target_origin.module_name, lrs.attribute - ) - for lrs in response.repository_symbols - } - return PythonEnvRepositoryLocationHandle( - origin=origin, repository_code_pointer_dict=repository_code_pointer_dict, - ) - - @staticmethod - def create_process_bound_grpc_server_location(origin): - from dagster.grpc.client import client_heartbeat_thread - from dagster.grpc.server import GrpcServerProcess - - check.inst_param(origin, "origin", ManagedGrpcPythonEnvRepositoryLocationOrigin) - loadable_target_origin = origin.loadable_target_origin - server = GrpcServerProcess( - loadable_target_origin=loadable_target_origin, - max_workers=2, - heartbeat=True, - lazy_load_user_code=True, - ) - client = server.create_ephemeral_client() - - heartbeat_shutdown_event = threading.Event() - - heartbeat_thread = threading.Thread( - target=client_heartbeat_thread, args=(client, heartbeat_shutdown_event) - ) - heartbeat_thread.daemon = True - heartbeat_thread.start() - list_repositories_response = sync_list_repositories_grpc(client) - - code_pointer_dict = list_repositories_response.repository_code_pointer_dict - - return ManagedGrpcPythonEnvRepositoryLocationHandle( - origin=origin, - executable_path=list_repositories_response.executable_path, - repository_code_pointer_dict=code_pointer_dict, - client=client, - grpc_server_process=server, - heartbeat_thread=heartbeat_thread, - heartbeat_shutdown_event=heartbeat_shutdown_event, - ) - - @staticmethod - def create_grpc_server_location(origin): - check.inst_param(origin, "origin", GrpcServerRepositoryLocationOrigin) - - port = origin.port - socket = origin.socket - host = origin.host +class GrpcServerRepositoryLocationHandle(RepositoryLocationHandle): + """ + Represents a gRPC server that Dagster is not responsible for managing. + """ + def __init__(self, origin): from dagster.grpc.client import DagsterGrpcClient - client = DagsterGrpcClient(port=port, socket=socket, host=host) - - list_repositories_response = sync_list_repositories_grpc(client) - - repository_names = set( - symbol.repository_name for symbol in list_repositories_response.repository_symbols - ) - - return GrpcServerRepositoryLocationHandle( - origin=origin, client=client, repository_names=repository_names, - ) + self.origin = check.inst_param(origin, "origin", GrpcServerRepositoryLocationOrigin) + port = self.origin.port + socket = self.origin.socket + host = self.origin.host -class GrpcServerRepositoryLocationHandle( - namedtuple("_GrpcServerRepositoryLocationHandle", "origin client repository_names",), - RepositoryLocationHandle, -): - """ - Represents a gRPC server that Dagster is not responsible for managing. - """ + self.client = DagsterGrpcClient(port=port, socket=socket, host=host) - def __new__(cls, origin, client, repository_names): - from dagster.grpc.client import DagsterGrpcClient + list_repositories_response = sync_list_repositories_grpc(self.client) - return super(GrpcServerRepositoryLocationHandle, cls).__new__( - cls, - check.inst_param(origin, "origin", GrpcServerRepositoryLocationOrigin), - check.inst_param(client, "client", DagsterGrpcClient), - check.set_param(repository_names, "repository_names", of_type=str), + self.repository_names = set( + symbol.repository_name for symbol in list_repositories_response.repository_symbols ) @property @@ -251,7 +139,7 @@ return self.origin.location_name def create_reloaded_handle(self): - return RepositoryLocationHandle.create_grpc_server_location(self.origin) + return RepositoryLocationHandle.create_from_repository_location_origin(self.origin) def get_current_image(self): job_image = self.client.get_current_image().current_image @@ -281,22 +169,45 @@ ) -class PythonEnvRepositoryLocationHandle( - namedtuple("_PythonEnvRepositoryLocationHandle", "origin repository_code_pointer_dict",), - RepositoryLocationHandle, -): - def __new__(cls, origin, repository_code_pointer_dict): - return super(PythonEnvRepositoryLocationHandle, cls).__new__( - cls, - check.inst_param(origin, "origin", PythonEnvRepositoryLocationOrigin,), - check.dict_param( - repository_code_pointer_dict, - "repository_code_pointer_dict", - key_type=str, - value_type=CodePointer, - ), +class PythonEnvRepositoryLocationHandle(RepositoryLocationHandle): + def __init__(self, origin): + self.origin = check.inst_param(origin, "origin", PythonEnvRepositoryLocationOrigin) + loadable_target_origin = self.origin.loadable_target_origin + + response = sync_list_repositories( + executable_path=loadable_target_origin.executable_path, + python_file=loadable_target_origin.python_file, + module_name=loadable_target_origin.module_name + if loadable_target_origin.module_name + else loadable_target_origin.package_name, + working_directory=loadable_target_origin.working_directory, + attribute=loadable_target_origin.attribute, ) + if loadable_target_origin.python_file: + self.repository_code_pointer_dict = { + lrs.repository_name: CodePointer.from_python_file( + loadable_target_origin.python_file, + lrs.attribute, + loadable_target_origin.working_directory, + ) + for lrs in response.repository_symbols + } + elif loadable_target_origin.package_name: + self.repository_code_pointer_dict = { + lrs.repository_name: CodePointer.from_python_package( + loadable_target_origin.package_name, lrs.attribute + ) + for lrs in response.repository_symbols + } + else: + self.repository_code_pointer_dict = { + lrs.repository_name: CodePointer.from_module( + loadable_target_origin.module_name, lrs.attribute + ) + for lrs in response.repository_symbols + } + @property def loadable_target_origin(self): return self.origin.loadable_target_origin @@ -313,46 +224,40 @@ return RepositoryLocationHandle.create_from_repository_location_origin(self.origin) -class ManagedGrpcPythonEnvRepositoryLocationHandle( - namedtuple( - "_ManagedGrpcPythonEnvRepositoryLocationHandle", - "origin executable_path repository_code_pointer_dict " - "grpc_server_process client heartbeat_thread heartbeat_shutdown_event", - ), - RepositoryLocationHandle, -): +class ManagedGrpcPythonEnvRepositoryLocationHandle(RepositoryLocationHandle): """ A Python environment for which Dagster is managing a gRPC server. """ - def __new__( - cls, - origin, - executable_path, - repository_code_pointer_dict, - grpc_server_process, - client, - heartbeat_thread, - heartbeat_shutdown_event, - ): - from dagster.grpc.client import DagsterGrpcClient + def __init__(self, origin): + from dagster.grpc.client import client_heartbeat_thread from dagster.grpc.server import GrpcServerProcess - return super(ManagedGrpcPythonEnvRepositoryLocationHandle, cls).__new__( - cls, - check.inst_param(origin, "origin", ManagedGrpcPythonEnvRepositoryLocationOrigin,), - check.str_param(executable_path, "executable_path"), - check.dict_param( - repository_code_pointer_dict, - "repository_code_pointer_dict", - key_type=str, - value_type=CodePointer, - ), - check.inst_param(grpc_server_process, "grpc_server_process", GrpcServerProcess), - check.inst_param(client, "client", DagsterGrpcClient), - check.inst_param(heartbeat_thread, "heartbeat_thread", threading.Thread), - heartbeat_shutdown_event, + self.origin = check.inst_param( + origin, "origin", ManagedGrpcPythonEnvRepositoryLocationOrigin ) + loadable_target_origin = origin.loadable_target_origin + + self.grpc_server_process = GrpcServerProcess( + loadable_target_origin=loadable_target_origin, + max_workers=2, + heartbeat=True, + lazy_load_user_code=True, + ) + self.client = self.grpc_server_process.create_ephemeral_client() + + self.heartbeat_shutdown_event = threading.Event() + + self.heartbeat_thread = threading.Thread( + target=client_heartbeat_thread, args=(self.client, self.heartbeat_shutdown_event) + ) + self.heartbeat_thread.daemon = True + self.heartbeat_thread.start() + list_repositories_response = sync_list_repositories_grpc(self.client) + + self.executable_path = list_repositories_response.executable_path + + self.repository_code_pointer_dict = list_repositories_response.repository_code_pointer_dict @property def location_name(self): @@ -387,21 +292,17 @@ self.client.cleanup_server() -class InProcessRepositoryLocationHandle( - namedtuple("_InProcessRepositoryLocationHandle", "location_name repository_code_pointer_dict"), - RepositoryLocationHandle, -): - def __new__(cls, location_name, repository_code_pointer_dict): - return super(InProcessRepositoryLocationHandle, cls).__new__( - cls, - check.str_param(location_name, "location_name"), - check.dict_param( - repository_code_pointer_dict, - "repository_code_pointer_dict", - key_type=str, - value_type=CodePointer, - ), - ) +class InProcessRepositoryLocationHandle(RepositoryLocationHandle): + def __init__(self, origin): + self.origin = check.inst_param(origin, "origin", InProcessRepositoryLocationOrigin) + + pointer = self.origin.recon_repo.pointer + repo_def = repository_def_from_pointer(pointer) + self.repository_code_pointer_dict = {repo_def.name: pointer} + + @property + def location_name(self): + return self.origin.location_name def create_reloaded_handle(self): raise NotImplementedError("Not implemented for in-process") diff --git a/python_modules/dagster/dagster/core/host_representation/origin.py b/python_modules/dagster/dagster/core/host_representation/origin.py --- a/python_modules/dagster/dagster/core/host_representation/origin.py +++ b/python_modules/dagster/dagster/core/host_representation/origin.py @@ -4,7 +4,7 @@ import six from dagster import check -from dagster.core.code_pointer import CodePointer +from dagster.core.definitions.reconstructable import ReconstructableRepository from dagster.core.types.loadable_target_origin import LoadableTargetOrigin from dagster.serdes import whitelist_for_serdes @@ -58,15 +58,15 @@ @whitelist_for_serdes class InProcessRepositoryLocationOrigin( - namedtuple("_InProcessRepositoryLocationOrigin", "code_pointer"), RepositoryLocationOrigin, + namedtuple("_InProcessRepositoryLocationOrigin", "recon_repo"), RepositoryLocationOrigin, ): """Identifies a repository location constructed in the host process. Should only be used in tests. """ - def __new__(cls, code_pointer): + def __new__(cls, recon_repo): return super(InProcessRepositoryLocationOrigin, cls).__new__( - cls, check.inst_param(code_pointer, "code_pointer", CodePointer,) + cls, check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) ) @property diff --git a/python_modules/dagster/dagster/core/host_representation/repository_location.py b/python_modules/dagster/dagster/core/host_representation/repository_location.py --- a/python_modules/dagster/dagster/core/host_representation/repository_location.py +++ b/python_modules/dagster/dagster/core/host_representation/repository_location.py @@ -25,7 +25,6 @@ sync_get_external_schedule_execution_data, sync_get_external_schedule_execution_data_grpc, ) -from dagster.core.definitions.reconstructable import ReconstructableRepository from dagster.core.execution.api import create_execution_plan from dagster.core.host_representation import ( ExternalExecutionPlan, @@ -159,11 +158,8 @@ check.inst_param( repository_location_handle, "repository_location_handle", RepositoryLocationHandle ) - if isinstance(repository_location_handle, InProcessRepositoryLocationHandle): - check.invariant(len(repository_location_handle.repository_code_pointer_dict) == 1) - pointer = next(iter(repository_location_handle.repository_code_pointer_dict.values())) - return InProcessRepositoryLocation(ReconstructableRepository(pointer)) + return InProcessRepositoryLocation(repository_location_handle) elif isinstance(repository_location_handle, PythonEnvRepositoryLocationHandle): return PythonEnvRepositoryLocation(repository_location_handle) elif isinstance( @@ -178,9 +174,10 @@ class InProcessRepositoryLocation(RepositoryLocation): - def __init__(self, recon_repo): - self._recon_repo = check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) - self._handle = RepositoryLocationHandle.create_in_process_location(recon_repo.pointer) + def __init__(self, handle): + self._handle = check.inst_param(handle, "handle", InProcessRepositoryLocationHandle,) + + recon_repo = self._handle.origin.recon_repo repo_def = recon_repo.get_definition() def_name = repo_def.name @@ -195,10 +192,10 @@ return False def get_reconstructable_pipeline(self, name): - return self._recon_repo.get_reconstructable_pipeline(name) + return self.get_reconstructable_repository().get_reconstructable_pipeline(name) def get_reconstructable_repository(self): - return self._recon_repo + return self._handle.origin.recon_repo @property def name(self): diff --git a/python_modules/dagster/dagster_tests/api_tests/utils.py b/python_modules/dagster/dagster_tests/api_tests/utils.py --- a/python_modules/dagster/dagster_tests/api_tests/utils.py +++ b/python_modules/dagster/dagster_tests/api_tests/utils.py @@ -4,19 +4,15 @@ from dagster import file_relative_path from dagster.core.definitions.reconstructable import ReconstructableRepository from dagster.core.host_representation import ( + GrpcServerRepositoryLocation, + InProcessRepositoryLocationOrigin, ManagedGrpcPythonEnvRepositoryLocationOrigin, - PythonEnvRepositoryLocationOrigin, -) -from dagster.core.host_representation.handle import ( PipelineHandle, + PythonEnvRepositoryLocationOrigin, + RepositoryLocation, RepositoryLocationHandle, UserProcessApi, ) -from dagster.core.host_representation.repository_location import ( - GrpcServerRepositoryLocation, - InProcessRepositoryLocation, - RepositoryLocation, -) from dagster.core.types.loadable_target_origin import LoadableTargetOrigin @@ -81,7 +77,15 @@ recon_repo = ReconstructableRepository.from_legacy_repository_yaml( file_relative_path(__file__, "legacy_repository_file.yaml") ) - return InProcessRepositoryLocation(recon_repo).get_repository("bar_repo").handle + return ( + RepositoryLocation.from_handle( + RepositoryLocationHandle.create_from_repository_location_origin( + InProcessRepositoryLocationOrigin(recon_repo) + ) + ) + .get_repository("bar_repo") + .handle + ) def legacy_get_foo_pipeline_handle(): diff --git a/python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py b/python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py --- a/python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py +++ b/python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py @@ -24,7 +24,11 @@ solid, ) from dagster.core.events import DagsterEventType -from dagster.core.host_representation.handle import RepositoryHandle, RepositoryLocationHandle +from dagster.core.host_representation import ( + InProcessRepositoryLocationOrigin, + RepositoryHandle, + RepositoryLocationHandle, +) from dagster.core.storage.pipeline_run import PipelineRunStatus from dagster.core.test_utils import instance_for_test from dagster.utils import file_relative_path, safe_tempfile_path @@ -153,7 +157,9 @@ recon_pipeline = reconstructable(pipeline_def) recon_repo = recon_pipeline.repository repo_def = recon_repo.get_definition() - location_handle = RepositoryLocationHandle.create_in_process_location(recon_repo.pointer) + location_handle = RepositoryLocationHandle.create_from_repository_location_origin( + InProcessRepositoryLocationOrigin(recon_repo) + ) repository_handle = RepositoryHandle( repository_name=repo_def.name, repository_location_handle=location_handle, ) diff --git a/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/config.py b/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/config.py --- a/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/config.py +++ b/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/config.py @@ -1,5 +1,5 @@ from dagster import Field, Noneable, StringSource -from dagster.core.host_representation.handle import IN_PROCESS_NAME +from dagster.core.host_representation import IN_PROCESS_NAME from dagster.utils import merge_dicts from dagster_celery.executor import CELERY_CONFIG from dagster_k8s import DagsterK8sJobConfig diff --git a/python_modules/libraries/dagster-dask/dagster_dask_tests/test_graphql.py b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_graphql.py --- a/python_modules/libraries/dagster-dask/dagster_dask_tests/test_graphql.py +++ b/python_modules/libraries/dagster-dask/dagster_dask_tests/test_graphql.py @@ -19,8 +19,7 @@ instance = DagsterInstance.local_temp() context = DagsterGraphQLContext( - workspace=Workspace([InProcessRepositoryLocationOrigin(recon_repo.pointer)]), - instance=instance, + workspace=Workspace([InProcessRepositoryLocationOrigin(recon_repo)]), instance=instance, ) selector = infer_pipeline_selector(context, "hammer_pipeline")