diff --git a/python_modules/dagster/dagster/api/execute_run.py b/python_modules/dagster/dagster/api/execute_run.py deleted file mode 100644 --- a/python_modules/dagster/dagster/api/execute_run.py +++ /dev/null @@ -1,62 +0,0 @@ -from dagster import check -from dagster.core.events import EngineEventData -from dagster.core.host_representation import ExternalPipelineOrigin -from dagster.core.instance import DagsterInstance -from dagster.core.instance.ref import InstanceRef -from dagster.core.storage.pipeline_run import PipelineRun -from dagster.grpc.server import ExecuteExternalPipelineArgs -from dagster.serdes.ipc import IPCErrorMessage - - -def execute_run_grpc(api_client, instance_ref, pipeline_origin, pipeline_run): - """Asynchronously execute a run over GRPC.""" - from dagster.grpc.client import DagsterGrpcClient - - check.inst_param(api_client, "api_client", DagsterGrpcClient) - check.inst_param(instance_ref, "instance_ref", InstanceRef) - check.inst_param(pipeline_origin, "pipeline_origin", ExternalPipelineOrigin) - check.inst_param(pipeline_run, "pipeline_run", PipelineRun) - - with DagsterInstance.from_ref(instance_ref) as instance: - yield instance.report_engine_event( - 'About to start process for pipeline "{pipeline_name}" (run_id: {run_id}).'.format( - pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id - ), - pipeline_run, - engine_event_data=EngineEventData(marker_start="cli_api_subprocess_init"), - ) - - run_did_fail = False - - execute_run_args = ExecuteExternalPipelineArgs( - pipeline_origin=pipeline_origin, - pipeline_run_id=pipeline_run.run_id, - instance_ref=instance_ref, - ) - for event in api_client.execute_run(execute_run_args=execute_run_args): - if isinstance(event, IPCErrorMessage): - yield instance.report_engine_event( - event.message, - pipeline_run=pipeline_run, - engine_event_data=EngineEventData( - marker_end="cli_api_subprocess_init", error=event.serializable_error_info - ), - ) - if not run_did_fail: - run_did_fail = True - yield instance.report_run_failed(pipeline_run) - else: - yield event - - -def sync_execute_run_grpc(api_client, instance_ref, pipeline_origin, pipeline_run): - """Synchronous version of execute_run_grpc.""" - return [ - event - for event in execute_run_grpc( - api_client=api_client, - instance_ref=instance_ref, - pipeline_origin=pipeline_origin, - pipeline_run=pipeline_run, - ) - ] diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2.py @@ -26,7 +26,7 @@ syntax="proto3", serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"@\n\x18\x45xternalJobParamsRequest\x12$\n\x1cserialized_external_job_args\x18\x01 \x01(\t"b\n\x16\x45xternalJobParamsReply\x12H\n@serialized_external_job_params_or_external_job_params_error_data\x18\x01 \x01(\t"8\n\x11\x45xecuteRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"H\n\x0f\x45xecuteRunEvent\x12\x35\n-serialized_dagster_event_or_ipc_error_message\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\x9a\r\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12Q\n\x11\x45xternalJobParams\x12\x1d.api.ExternalJobParamsRequest\x1a\x1b.api.ExternalJobParamsReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12>\n\nExecuteRun\x12\x16.api.ExecuteRunRequest\x1a\x14.api.ExecuteRunEvent"\x00\x30\x01\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', + serialized_pb=b'\n\tapi.proto\x12\x03\x61pi"\x07\n\x05\x45mpty"\x1b\n\x0bPingRequest\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"\x19\n\tPingReply\x12\x0c\n\x04\x65\x63ho\x18\x01 \x01(\t"=\n\x14StreamingPingRequest\x12\x17\n\x0fsequence_length\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t";\n\x12StreamingPingEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12\x0c\n\x04\x65\x63ho\x18\x02 \x01(\t"O\n\x1c\x45xecutionPlanSnapshotRequest\x12/\n\'serialized_execution_plan_snapshot_args\x18\x01 \x01(\t"H\n\x1a\x45xecutionPlanSnapshotReply\x12*\n"serialized_execution_plan_snapshot\x18\x01 \x01(\t"H\n\x1d\x45xternalPartitionNamesRequest\x12\'\n\x1fserialized_partition_names_args\x18\x01 \x01(\t"p\n\x1b\x45xternalPartitionNamesReply\x12Q\nIserialized_external_partition_names_or_external_partition_execution_error\x18\x01 \x01(\t"C\n\x1e\x45xternalPartitionConfigRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"r\n\x1c\x45xternalPartitionConfigReply\x12R\nJserialized_external_partition_config_or_external_partition_execution_error\x18\x01 \x01(\t"A\n\x1c\x45xternalPartitionTagsRequest\x12!\n\x19serialized_partition_args\x18\x01 \x01(\t"n\n\x1a\x45xternalPartitionTagsReply\x12P\nHserialized_external_partition_tags_or_external_partition_execution_error\x18\x01 \x01(\t"c\n*ExternalPartitionSetExecutionParamsRequest\x12\x35\n-serialized_partition_set_execution_param_args\x18\x01 \x01(\t"\x90\x01\n(ExternalPartitionSetExecutionParamsReply\x12\x64\n\\serialized_external_partition_set_execution_param_data_or_external_partition_execution_error\x18\x01 \x01(\t"\x19\n\x17ListRepositoriesRequest"O\n\x15ListRepositoriesReply\x12\x36\n.serialized_list_repositories_response_or_error\x18\x01 \x01(\t"Y\n%ExternalPipelineSubsetSnapshotRequest\x12\x30\n(serialized_pipeline_subset_snapshot_args\x18\x01 \x01(\t"Y\n#ExternalPipelineSubsetSnapshotReply\x12\x32\n*serialized_external_pipeline_subset_result\x18\x01 \x01(\t"H\n\x19\x45xternalRepositoryRequest\x12+\n#serialized_repository_python_origin\x18\x01 \x01(\t"F\n\x17\x45xternalRepositoryReply\x12+\n#serialized_external_repository_data\x18\x01 \x01(\t"i\n StreamingExternalRepositoryEvent\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x05\x12,\n$serialized_external_repository_chunk\x18\x02 \x01(\t"W\n ExternalScheduleExecutionRequest\x12\x33\n+serialized_external_schedule_execution_args\x18\x01 \x01(\t"z\n\x1e\x45xternalScheduleExecutionReply\x12X\nPserialized_external_schedule_execution_data_or_external_schedule_execution_error\x18\x01 \x01(\t"@\n\x18\x45xternalJobParamsRequest\x12$\n\x1cserialized_external_job_args\x18\x01 \x01(\t"b\n\x16\x45xternalJobParamsReply\x12H\n@serialized_external_job_params_or_external_job_params_error_data\x18\x01 \x01(\t"@\n\x13ShutdownServerReply\x12)\n!serialized_shutdown_server_result\x18\x01 \x01(\t"E\n\x16\x43\x61ncelExecutionRequest\x12+\n#serialized_cancel_execution_request\x18\x01 \x01(\t"B\n\x14\x43\x61ncelExecutionReply\x12*\n"serialized_cancel_execution_result\x18\x01 \x01(\t"L\n\x19\x43\x61nCancelExecutionRequest\x12/\n\'serialized_can_cancel_execution_request\x18\x01 \x01(\t"I\n\x17\x43\x61nCancelExecutionReply\x12.\n&serialized_can_cancel_execution_result\x18\x01 \x01(\t"6\n\x0fStartRunRequest\x12#\n\x1bserialized_execute_run_args\x18\x01 \x01(\t"4\n\rStartRunReply\x12#\n\x1bserialized_start_run_result\x18\x01 \x01(\t"8\n\x14GetCurrentImageReply\x12 \n\x18serialized_current_image\x18\x01 \x01(\t2\xda\x0c\n\nDagsterApi\x12*\n\x04Ping\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12/\n\tHeartbeat\x12\x10.api.PingRequest\x1a\x0e.api.PingReply"\x00\x12G\n\rStreamingPing\x12\x19.api.StreamingPingRequest\x1a\x17.api.StreamingPingEvent"\x00\x30\x01\x12]\n\x15\x45xecutionPlanSnapshot\x12!.api.ExecutionPlanSnapshotRequest\x1a\x1f.api.ExecutionPlanSnapshotReply"\x00\x12N\n\x10ListRepositories\x12\x1c.api.ListRepositoriesRequest\x1a\x1a.api.ListRepositoriesReply"\x00\x12`\n\x16\x45xternalPartitionNames\x12".api.ExternalPartitionNamesRequest\x1a .api.ExternalPartitionNamesReply"\x00\x12\x63\n\x17\x45xternalPartitionConfig\x12#.api.ExternalPartitionConfigRequest\x1a!.api.ExternalPartitionConfigReply"\x00\x12]\n\x15\x45xternalPartitionTags\x12!.api.ExternalPartitionTagsRequest\x1a\x1f.api.ExternalPartitionTagsReply"\x00\x12\x87\x01\n#ExternalPartitionSetExecutionParams\x12/.api.ExternalPartitionSetExecutionParamsRequest\x1a-.api.ExternalPartitionSetExecutionParamsReply"\x00\x12x\n\x1e\x45xternalPipelineSubsetSnapshot\x12*.api.ExternalPipelineSubsetSnapshotRequest\x1a(.api.ExternalPipelineSubsetSnapshotReply"\x00\x12T\n\x12\x45xternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a\x1c.api.ExternalRepositoryReply"\x00\x12h\n\x1bStreamingExternalRepository\x12\x1e.api.ExternalRepositoryRequest\x1a%.api.StreamingExternalRepositoryEvent"\x00\x30\x01\x12i\n\x19\x45xternalScheduleExecution\x12%.api.ExternalScheduleExecutionRequest\x1a#.api.ExternalScheduleExecutionReply"\x00\x12Q\n\x11\x45xternalJobParams\x12\x1d.api.ExternalJobParamsRequest\x1a\x1b.api.ExternalJobParamsReply"\x00\x12\x38\n\x0eShutdownServer\x12\n.api.Empty\x1a\x18.api.ShutdownServerReply"\x00\x12K\n\x0f\x43\x61ncelExecution\x12\x1b.api.CancelExecutionRequest\x1a\x19.api.CancelExecutionReply"\x00\x12T\n\x12\x43\x61nCancelExecution\x12\x1e.api.CanCancelExecutionRequest\x1a\x1c.api.CanCancelExecutionReply"\x00\x12\x36\n\x08StartRun\x12\x14.api.StartRunRequest\x1a\x12.api.StartRunReply"\x00\x12:\n\x0fGetCurrentImage\x12\n.api.Empty\x1a\x19.api.GetCurrentImageReply"\x00\x62\x06proto3', ) @@ -1113,88 +1113,6 @@ ) -_EXECUTERUNREQUEST = _descriptor.Descriptor( - name="ExecuteRunRequest", - full_name="api.ExecuteRunRequest", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="serialized_execute_run_args", - full_name="api.ExecuteRunRequest.serialized_execute_run_args", - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=2084, - serialized_end=2140, -) - - -_EXECUTERUNEVENT = _descriptor.Descriptor( - name="ExecuteRunEvent", - full_name="api.ExecuteRunEvent", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="serialized_dagster_event_or_ipc_error_message", - full_name="api.ExecuteRunEvent.serialized_dagster_event_or_ipc_error_message", - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=2142, - serialized_end=2214, -) - - _SHUTDOWNSERVERREPLY = _descriptor.Descriptor( name="ShutdownServerReply", full_name="api.ShutdownServerReply", @@ -1231,8 +1149,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2216, - serialized_end=2280, + serialized_start=2084, + serialized_end=2148, ) @@ -1272,8 +1190,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2282, - serialized_end=2351, + serialized_start=2150, + serialized_end=2219, ) @@ -1313,8 +1231,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2353, - serialized_end=2419, + serialized_start=2221, + serialized_end=2287, ) @@ -1354,8 +1272,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2421, - serialized_end=2497, + serialized_start=2289, + serialized_end=2365, ) @@ -1395,8 +1313,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2499, - serialized_end=2572, + serialized_start=2367, + serialized_end=2440, ) @@ -1436,8 +1354,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2574, - serialized_end=2628, + serialized_start=2442, + serialized_end=2496, ) @@ -1477,8 +1395,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2630, - serialized_end=2682, + serialized_start=2498, + serialized_end=2550, ) @@ -1518,8 +1436,8 @@ syntax="proto3", extension_ranges=[], oneofs=[], - serialized_start=2684, - serialized_end=2740, + serialized_start=2552, + serialized_end=2608, ) DESCRIPTOR.message_types_by_name["Empty"] = _EMPTY @@ -1560,8 +1478,6 @@ DESCRIPTOR.message_types_by_name["ExternalScheduleExecutionReply"] = _EXTERNALSCHEDULEEXECUTIONREPLY DESCRIPTOR.message_types_by_name["ExternalJobParamsRequest"] = _EXTERNALJOBPARAMSREQUEST DESCRIPTOR.message_types_by_name["ExternalJobParamsReply"] = _EXTERNALJOBPARAMSREPLY -DESCRIPTOR.message_types_by_name["ExecuteRunRequest"] = _EXECUTERUNREQUEST -DESCRIPTOR.message_types_by_name["ExecuteRunEvent"] = _EXECUTERUNEVENT DESCRIPTOR.message_types_by_name["ShutdownServerReply"] = _SHUTDOWNSERVERREPLY DESCRIPTOR.message_types_by_name["CancelExecutionRequest"] = _CANCELEXECUTIONREQUEST DESCRIPTOR.message_types_by_name["CancelExecutionReply"] = _CANCELEXECUTIONREPLY @@ -1858,28 +1774,6 @@ ) _sym_db.RegisterMessage(ExternalJobParamsReply) -ExecuteRunRequest = _reflection.GeneratedProtocolMessageType( - "ExecuteRunRequest", - (_message.Message,), - { - "DESCRIPTOR": _EXECUTERUNREQUEST, - "__module__": "api_pb2" - # @@protoc_insertion_point(class_scope:api.ExecuteRunRequest) - }, -) -_sym_db.RegisterMessage(ExecuteRunRequest) - -ExecuteRunEvent = _reflection.GeneratedProtocolMessageType( - "ExecuteRunEvent", - (_message.Message,), - { - "DESCRIPTOR": _EXECUTERUNEVENT, - "__module__": "api_pb2" - # @@protoc_insertion_point(class_scope:api.ExecuteRunEvent) - }, -) -_sym_db.RegisterMessage(ExecuteRunEvent) - ShutdownServerReply = _reflection.GeneratedProtocolMessageType( "ShutdownServerReply", (_message.Message,), @@ -1976,8 +1870,8 @@ index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=2743, - serialized_end=4433, + serialized_start=2611, + serialized_end=4237, methods=[ _descriptor.MethodDescriptor( name="Ping", @@ -2129,20 +2023,10 @@ serialized_options=None, create_key=_descriptor._internal_create_key, ), - _descriptor.MethodDescriptor( - name="ExecuteRun", - full_name="api.DagsterApi.ExecuteRun", - index=15, - containing_service=None, - input_type=_EXECUTERUNREQUEST, - output_type=_EXECUTERUNEVENT, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), _descriptor.MethodDescriptor( name="CancelExecution", full_name="api.DagsterApi.CancelExecution", - index=16, + index=15, containing_service=None, input_type=_CANCELEXECUTIONREQUEST, output_type=_CANCELEXECUTIONREPLY, @@ -2152,7 +2036,7 @@ _descriptor.MethodDescriptor( name="CanCancelExecution", full_name="api.DagsterApi.CanCancelExecution", - index=17, + index=16, containing_service=None, input_type=_CANCANCELEXECUTIONREQUEST, output_type=_CANCANCELEXECUTIONREPLY, @@ -2162,7 +2046,7 @@ _descriptor.MethodDescriptor( name="StartRun", full_name="api.DagsterApi.StartRun", - index=18, + index=17, containing_service=None, input_type=_STARTRUNREQUEST, output_type=_STARTRUNREPLY, @@ -2172,7 +2056,7 @@ _descriptor.MethodDescriptor( name="GetCurrentImage", full_name="api.DagsterApi.GetCurrentImage", - index=19, + index=18, containing_service=None, input_type=_EMPTY, output_type=_GETCURRENTIMAGEREPLY, diff --git a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py --- a/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py +++ b/python_modules/dagster/dagster/grpc/__generated__/api_pb2_grpc.py @@ -97,11 +97,6 @@ request_serializer=api__pb2.Empty.SerializeToString, response_deserializer=api__pb2.ShutdownServerReply.FromString, ) - self.ExecuteRun = channel.unary_stream( - "/api.DagsterApi/ExecuteRun", - request_serializer=api__pb2.ExecuteRunRequest.SerializeToString, - response_deserializer=api__pb2.ExecuteRunEvent.FromString, - ) self.CancelExecution = channel.unary_unary( "/api.DagsterApi/CancelExecution", request_serializer=api__pb2.CancelExecutionRequest.SerializeToString, @@ -217,12 +212,6 @@ context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") - def ExecuteRun(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") - def CancelExecution(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -325,11 +314,6 @@ request_deserializer=api__pb2.Empty.FromString, response_serializer=api__pb2.ShutdownServerReply.SerializeToString, ), - "ExecuteRun": grpc.unary_stream_rpc_method_handler( - servicer.ExecuteRun, - request_deserializer=api__pb2.ExecuteRunRequest.FromString, - response_serializer=api__pb2.ExecuteRunEvent.SerializeToString, - ), "CancelExecution": grpc.unary_unary_rpc_method_handler( servicer.CancelExecution, request_deserializer=api__pb2.CancelExecutionRequest.FromString, @@ -794,35 +778,6 @@ metadata, ) - @staticmethod - def ExecuteRun( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_stream( - request, - target, - "/api.DagsterApi/ExecuteRun", - api__pb2.ExecuteRunRequest.SerializeToString, - api__pb2.ExecuteRunEvent.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) - @staticmethod def CancelExecution( request, diff --git a/python_modules/dagster/dagster/grpc/client.py b/python_modules/dagster/dagster/grpc/client.py --- a/python_modules/dagster/dagster/grpc/client.py +++ b/python_modules/dagster/dagster/grpc/client.py @@ -268,74 +268,6 @@ res.serialized_external_job_params_or_external_job_params_error_data ) - def execute_run(self, execute_run_args): - check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) - - with DagsterInstance.from_ref(execute_run_args.instance_ref) as instance: - try: - pipeline_run = instance.get_run_by_id(execute_run_args.pipeline_run_id) - event_iterator = self._streaming_query( - "ExecuteRun", - api_pb2.ExecuteRunRequest, - serialized_execute_run_args=serialize_dagster_namedtuple(execute_run_args), - ) - except Exception as exc: # pylint: disable=bare-except - yield instance.report_engine_event( - message="Unexpected error in IPC client", - pipeline_run=pipeline_run, - engine_event_data=EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - ) - raise exc - - try: - for event in event_iterator: - yield deserialize_json_to_dagster_namedtuple( - event.serialized_dagster_event_or_ipc_error_message - ) - except KeyboardInterrupt: - self.cancel_execution( - CancelExecutionRequest(run_id=execute_run_args.pipeline_run_id) - ) - raise - except grpc.RpcError as rpc_error: - if ( - # posix - "Socket closed" in rpc_error.debug_error_string() # pylint: disable=no-member - # windows - or "Stream removed" - in rpc_error.debug_error_string() # pylint: disable=no-member - ): - yield instance.report_engine_event( - message="User process: GRPC server for {run_id} terminated unexpectedly".format( - run_id=pipeline_run.run_id - ), - pipeline_run=pipeline_run, - engine_event_data=EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - ) - yield instance.report_run_failed(pipeline_run) - else: - yield instance.report_engine_event( - message="Unexpected error in IPC client", - pipeline_run=pipeline_run, - engine_event_data=EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - ) - raise rpc_error - except Exception as exc: # pylint: disable=bare-except - yield instance.report_engine_event( - message="Unexpected error in IPC client", - pipeline_run=pipeline_run, - engine_event_data=EngineEventData.engine_error( - serializable_error_info_from_exc_info(sys.exc_info()) - ), - ) - raise exc - def shutdown_server(self, timeout=15): res = self._query("ShutdownServer", api_pb2.Empty, timeout=timeout) return deserialize_json_to_dagster_namedtuple(res.serialized_shutdown_server_result) diff --git a/python_modules/dagster/dagster/grpc/impl.py b/python_modules/dagster/dagster/grpc/impl.py --- a/python_modules/dagster/dagster/grpc/impl.py +++ b/python_modules/dagster/dagster/grpc/impl.py @@ -165,19 +165,6 @@ instance.dispose() -def execute_run_in_subprocess( - serialized_execute_run_args, recon_pipeline, event_queue, termination_event -): - with delay_interrupts(): - _run_in_subprocess( - serialized_execute_run_args, - recon_pipeline, - termination_event, - subprocess_status_handler=event_queue.put, - run_event_handler=event_queue.put, - ) - - def start_run_in_subprocess( serialized_execute_run_args, recon_pipeline, event_queue, termination_event ): diff --git a/python_modules/dagster/dagster/grpc/protos/api.proto b/python_modules/dagster/dagster/grpc/protos/api.proto --- a/python_modules/dagster/dagster/grpc/protos/api.proto +++ b/python_modules/dagster/dagster/grpc/protos/api.proto @@ -20,7 +20,6 @@ rpc ExternalScheduleExecution (ExternalScheduleExecutionRequest) returns (ExternalScheduleExecutionReply) {} rpc ExternalJobParams (ExternalJobParamsRequest) returns (ExternalJobParamsReply) {} rpc ShutdownServer (Empty) returns (ShutdownServerReply) {} - rpc ExecuteRun (ExecuteRunRequest) returns (stream ExecuteRunEvent) {} rpc CancelExecution (CancelExecutionRequest) returns (CancelExecutionReply) {} rpc CanCancelExecution (CanCancelExecutionRequest) returns (CanCancelExecutionReply) {} rpc StartRun (StartRunRequest) returns (StartRunReply) {} @@ -131,14 +130,6 @@ string serialized_external_job_params_or_external_job_params_error_data = 1; } -message ExecuteRunRequest { - string serialized_execute_run_args = 1; -} - -message ExecuteRunEvent { - string serialized_dagster_event_or_ipc_error_message = 1; -} - message ShutdownServerReply { string serialized_shutdown_server_result = 1; } diff --git a/python_modules/dagster/dagster/grpc/server.py b/python_modules/dagster/dagster/grpc/server.py --- a/python_modules/dagster/dagster/grpc/server.py +++ b/python_modules/dagster/dagster/grpc/server.py @@ -40,7 +40,6 @@ from .impl import ( RunInSubprocessComplete, StartRunInSubprocessSuccessful, - execute_run_in_subprocess, get_external_execution_plan_snapshot, get_external_job_params, get_external_pipeline_subset_result, @@ -514,102 +513,6 @@ ) ) - def ExecuteRun(self, request, _context): - if self._shutdown_once_executions_finish_event.is_set(): - yield api_pb2.ExecuteRunEvent( - serialized_dagster_event_or_ipc_error_message=serialize_dagster_namedtuple( - IPCErrorMessage( - serializable_error_info=None, - message="Tried to start a run on a server after telling it to shut down", - ) - ) - ) - - try: - execute_run_args = deserialize_json_to_dagster_namedtuple( - request.serialized_execute_run_args - ) - check.inst_param(execute_run_args, "execute_run_args", ExecuteExternalPipelineArgs) - - run_id = execute_run_args.pipeline_run_id - - recon_pipeline = self._recon_pipeline_from_origin(execute_run_args.pipeline_origin) - - except: # pylint: disable=bare-except - yield api_pb2.ExecuteRunEvent( - serialized_dagster_event_or_ipc_error_message=serialize_dagster_namedtuple( - IPCErrorMessage( - serializable_error_info=serializable_error_info_from_exc_info( - sys.exc_info() - ), - message="Error during RPC setup for ExecuteRun", - ) - ) - ) - return - - event_queue = multiprocessing.Queue() - termination_event = multiprocessing.Event() - execution_process = multiprocessing.Process( - target=execute_run_in_subprocess, - args=[ - request.serialized_execute_run_args, - recon_pipeline, - event_queue, - termination_event, - ], - ) - with self._execution_lock: - execution_process.start() - self._executions[run_id] = ( - execution_process, - execute_run_args.instance_ref, - ) - self._termination_events[run_id] = termination_event - - done = False - while not done: - try: - # We use `get_nowait()` instead of `get()` so that we can handle the case where the - # execution process has died unexpectedly -- `get()` would hang forever in that case - dagster_event_or_ipc_error_message_or_done = event_queue.get_nowait() - except queue.Empty: - if not execution_process.is_alive(): - # subprocess died unexpectedly - yield api_pb2.ExecuteRunEvent( - serialized_dagster_event_or_ipc_error_message=serialize_dagster_namedtuple( - IPCErrorMessage( - serializable_error_info=serializable_error_info_from_exc_info( - sys.exc_info() - ), - message=( - "GRPC server: Subprocess for {run_id} terminated unexpectedly" - ).format(run_id=run_id), - ) - ) - ) - done = True - time.sleep(EVENT_QUEUE_POLL_INTERVAL) - else: - if isinstance(dagster_event_or_ipc_error_message_or_done, RunInSubprocessComplete): - done = True - elif isinstance( - dagster_event_or_ipc_error_message_or_done, StartRunInSubprocessSuccessful - ): - continue - else: - yield api_pb2.ExecuteRunEvent( - serialized_dagster_event_or_ipc_error_message=serialize_dagster_namedtuple( - dagster_event_or_ipc_error_message_or_done - ) - ) - - with self._execution_lock: - if run_id in self._executions: - del self._executions[run_id] - if run_id in self._termination_events: - del self._termination_events[run_id] - def ShutdownServer(self, request, _context): try: self._shutdown_once_executions_finish_event.set() diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_execute_run.py b/python_modules/dagster/dagster_tests/api_tests/test_api_execute_run.py deleted file mode 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_execute_run.py +++ /dev/null @@ -1,84 +0,0 @@ -from dagster.api.execute_run import sync_execute_run_grpc -from dagster.core.test_utils import instance_for_test -from dagster.grpc.server import GrpcServerProcess - -from .utils import get_foo_grpc_pipeline_handle, get_foo_pipeline_handle - - -def assert_ran_successfully(events): - assert len(events) == 17 - pipeline_start_events = [e for e in events if e.event_type_value == "PIPELINE_START"] - step_success_events = [e for e in events if e.event_type_value == "STEP_SUCCESS"] - pipeline_success_events = [e for e in events if e.event_type_value == "PIPELINE_SUCCESS"] - assert len(pipeline_start_events) == 1 - assert len(step_success_events) == 2 - assert len(pipeline_success_events) == 1 - - -def test_execute_run_api_grpc_server_handle(): - with instance_for_test() as instance: - with get_foo_grpc_pipeline_handle() as pipeline_handle: - pipeline_run = instance.create_run( - pipeline_name="foo", - run_id=None, - run_config={}, - mode="default", - solids_to_execute=None, - step_keys_to_execute=None, - status=None, - tags=None, - root_run_id=None, - parent_run_id=None, - pipeline_snapshot=None, - execution_plan_snapshot=None, - parent_pipeline_snapshot=None, - ) - events = [ - event - for event in sync_execute_run_grpc( - api_client=pipeline_handle.repository_handle.repository_location_handle.client, - instance_ref=instance.get_ref(), - pipeline_origin=pipeline_handle.get_external_origin(), - pipeline_run=pipeline_run, - ) - ] - assert_ran_successfully(events) - - -def test_execute_run_api_grpc_python_handle(): - with instance_for_test() as instance: - with get_foo_pipeline_handle() as pipeline_handle: - pipeline_run = instance.create_run( - pipeline_name="foo", - run_id=None, - run_config={}, - mode="default", - solids_to_execute=None, - step_keys_to_execute=None, - status=None, - tags=None, - root_run_id=None, - parent_run_id=None, - pipeline_snapshot=None, - execution_plan_snapshot=None, - parent_pipeline_snapshot=None, - ) - - loadable_target_origin = ( - pipeline_handle.get_external_origin().external_repository_origin.repository_location_origin.loadable_target_origin - ) - - server_process = GrpcServerProcess(loadable_target_origin, max_workers=2) - with server_process.create_ephemeral_client() as api_client: - events = [ - event - for event in sync_execute_run_grpc( - api_client=api_client, - instance_ref=instance.get_ref(), - pipeline_origin=pipeline_handle.get_external_origin(), - pipeline_run=pipeline_run, - ) - ] - - assert_ran_successfully(events) - server_process.wait()