diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-05-20 18:32:16 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-05-20 18:32:16 +0300 |
commit | bd647416fc1f87644c0d8391b53d22254edfdfdd (patch) | |
tree | d0ef5a4f21a0e5a7cf0208bbab893d70d3b77c13 | |
parent | 92394d748cf66c288843ce740d2d37a94f67bf49 (diff) | |
download | ydb-bd647416fc1f87644c0d8391b53d22254edfdfdd.tar.gz |
GetTask CP message is proto based + LoopbackService support
ref:09af2b300d81600704800c504d85fcd732251f9e
8 files changed, 183 insertions, 179 deletions
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp index 6cc8bb922ab..ff5269b9a46 100644 --- a/ydb/core/yq/libs/actors/task_get.cpp +++ b/ydb/core/yq/libs/actors/task_get.cpp @@ -19,9 +19,9 @@ #include <ydb/library/security/util.h> #define LOG_E(stream) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", "<< stream) + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", Tenant: " << Tenant << ", " << stream) #define LOG_D(stream) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", " << stream) + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", Tenant: " << Tenant << ", " << stream) namespace NYq { @@ -56,10 +56,10 @@ public: void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) { LOG_E("TGetTaskRequestActor::OnUndelivered"); - auto Res = MakeHolder<TEvents::TEvGetTaskResponse>(); - Res->Status = Ydb::StatusIds::GENERIC_ERROR; - Res->Issues.AddIssue("UNDELIVERED"); - ctx.Send(ev->Sender, Res.Release()); + auto response = MakeHolder<TEvents::TEvGetTaskResponse>(); + response->Status = Ydb::StatusIds::GENERIC_ERROR; + response->Issues.AddIssue("UNDELIVERED"); + ctx.Send(ev->Sender, response.Release()); Die(ctx); } @@ -74,61 +74,29 @@ public: LOG_E(TStringBuilder() << "Failed with code: " << codeStr << " Details: " << Issues.ToString()); - auto Res = MakeHolder<TEvents::TEvGetTaskResponse>(); - Res->Status = reqStatus; - Res->Issues.AddIssues(Issues); - Send(Sender, Res.Release()); + auto response = MakeHolder<TEvents::TEvGetTaskResponse>(); + response->Status = reqStatus; + response->Issues.AddIssues(Issues); + Send(Sender, response.Release()); PassAway(); } void Bootstrap(const TActorContext& ctx) { Become(&TGetTaskRequestActor::StateFunc); - const auto& req = Ev->Record; - OwnerId = req.owner_id(); - Host = req.host(); - Tenant = req.tenant(); - LOG_D("Request CP::GetTask with size: " << req.ByteSize() << " bytes"); - RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024); + auto request = Ev->Record; + LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes"); + RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024); + OwnerId = request.owner_id(); + Host = request.host(); + Tenant = request.tenant(); ctx.Send(NYq::ControlPlaneStorageServiceActorId(), - new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(OwnerId, Host, Tenant)); - } - - static TString GetServiceAccountId(const YandexQuery::IamAuth& auth) { - return auth.has_service_account() - ? auth.service_account().id() - : TString{}; - } - - static TString ExtractServiceAccountId(const YandexQuery::Connection& c) { - switch (c.content().setting().connection_case()) { - case YandexQuery::ConnectionSetting::kYdbDatabase: { - return GetServiceAccountId(c.content().setting().ydb_database().auth()); - } - case YandexQuery::ConnectionSetting::kDataStreams: { - return GetServiceAccountId(c.content().setting().data_streams().auth()); - } - case YandexQuery::ConnectionSetting::kObjectStorage: { - return GetServiceAccountId(c.content().setting().object_storage().auth()); - } - case YandexQuery::ConnectionSetting::kMonitoring: { - return GetServiceAccountId(c.content().setting().monitoring().auth()); - } - case YandexQuery::ConnectionSetting::kClickhouseCluster: { - return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth()); - } - // Do not replace with default. Adding a new connection should cause a compilation error - case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: - break; - } - return {}; + new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request))); } private: void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev, const TActorContext& ctx) { // YQ LOG_D("Got CP::GetTask Response"); - const auto& tasks = ev->Get()->Tasks; - Res->Record.ConstructInPlace(); - Res->Status = Ydb::StatusIds::SUCCESS; + const auto& issues = ev->Get()->Issues; if (issues) { Issues.AddIssues(issues); @@ -136,61 +104,24 @@ private: return; } + auto response = MakeHolder<TEvents::TEvGetTaskResponse>(); + response->Status = Ydb::StatusIds::SUCCESS; + response->Record.ConstructInPlace(); + auto& record = *response->Record; + record = ev->Get()->Record; try { - for (const auto& task : tasks) { - const auto& queryType = task.Query.content().type(); - if (queryType != YandexQuery::QueryContent::ANALYTICS && queryType != YandexQuery::QueryContent::STREAMING) { //TODO: fix - ythrow yexception() - << "query type " - << YandexQuery::QueryContent::QueryType_Name(queryType) - << " unsupported"; - } - auto* newTask = Res->Record->add_tasks(); - newTask->set_query_type(queryType); - newTask->set_execute_mode(task.Query.meta().execute_mode()); - newTask->set_state_load_mode(task.Internal.state_load_mode()); - auto* queryId = newTask->mutable_query_id(); - queryId->set_value(task.Query.meta().common().id()); - newTask->set_streaming(queryType == YandexQuery::QueryContent::STREAMING); - newTask->set_text(task.Query.content().text()); - *newTask->mutable_connection() = task.Internal.connection(); - *newTask->mutable_binding() = task.Internal.binding(); - newTask->set_user_token(task.Internal.token()); - newTask->set_user_id(task.Query.meta().common().created_by()); - newTask->set_generation(task.Generation); - newTask->set_status(task.Query.meta().status()); - *newTask->mutable_created_topic_consumers() = task.Internal.created_topic_consumers(); - newTask->mutable_sensor_labels()->insert({"cloud_id", task.Internal.cloud_id()}); - newTask->mutable_sensor_labels()->insert({"scope", task.Scope}); - newTask->set_automatic(task.Query.content().automatic()); - newTask->set_query_name(task.Query.content().name()); - *newTask->mutable_deadline() = NProtoInterop::CastToProto(task.Deadline); - newTask->mutable_disposition()->CopyFrom(task.Internal.disposition()); - newTask->set_result_limit(task.Internal.result_limit()); - + for (auto& task : *record.mutable_tasks()) { THashMap<TString, TString> accountIdSignatures; - for (const auto& connection: task.Internal.connection()) { - const auto serviceAccountId = ExtractServiceAccountId(connection); - if (!serviceAccountId) { - continue; - } - + for (auto& account : *task.mutable_service_accounts()) { + const auto serviceAccountId = account.value(); auto& signature = accountIdSignatures[serviceAccountId]; if (!signature && Signer) { signature = Signer->SignAccountId(serviceAccountId); } - auto* account = newTask->add_service_accounts(); - account->set_value(serviceAccountId); - account->set_signature(signature); + account.set_signature(signature); } - - *newTask->mutable_dq_graph() = task.Internal.dq_graph(); - newTask->set_dq_graph_index(task.Internal.dq_graph_index()); - - *newTask->mutable_result_set_meta() = task.Query.result_set_meta(); - newTask->set_scope(task.Scope); } - ctx.Send(Sender, Res.Release()); + ctx.Send(Sender, response.Release()); Die(ctx); } catch (...) { const auto msg = TStringBuilder() << "Can't do GetTask: " << CurrentExceptionMessage(); @@ -221,8 +152,6 @@ private: TString OwnerId; TString Host; TString Tenant; - - THolder<TEvents::TEvGetTaskResponse> Res = MakeHolder<TEvents::TEvGetTaskResponse>(); }; IActor* CreateGetTaskRequestActor( diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index 4566e1239cf..ec89313c029 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -383,77 +383,43 @@ struct TEvControlPlaneStorage { }; struct TEvGetTaskRequest : NActors::TEventLocal<TEvGetTaskRequest, EvGetTaskRequest> { - explicit TEvGetTaskRequest( - const TString& owner, - const TString& hostName, - const TString& tenantName) - : Owner(owner) - , HostName(hostName) - , TenantName(tenantName) - { - } - size_t GetByteSize() const { - return sizeof(*this) - + Owner.Size() - + HostName.Size() - + TenantName.Size(); - } - - TString Owner; - TString HostName; - TString TenantName; - }; + TEvGetTaskRequest() = default; - struct TTask { - TString Scope; - TString QueryId; - YandexQuery::Query Query; - YandexQuery::Internal::QueryInternal Internal; - ui64 Generation = 0; - TInstant Deadline; + explicit TEvGetTaskRequest( + Yq::Private::GetTaskRequest&& request) + : Request(std::move(request)) + {} size_t GetByteSize() const { return sizeof(*this) - + Scope.Size() - + QueryId.Size() - + Query.ByteSizeLong() - + Internal.ByteSizeLong(); + + Request.ByteSizeLong(); } + + Yq::Private::GetTaskRequest Request; }; struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, EvGetTaskResponse> { - explicit TEvGetTaskResponse(const TVector<TTask>& tasks, const TString& owner) - : Tasks(tasks) - , Owner(owner) - { - } - explicit TEvGetTaskResponse(const NYql::TIssues& issues) + explicit TEvGetTaskResponse( + const Yq::Private::GetTaskResult& record) + : Record(record) + {} + + explicit TEvGetTaskResponse( + const NYql::TIssues& issues) : Issues(issues) - { - } + {} size_t GetByteSize() const { return sizeof(*this) - + TasksByteSizeLong() + + Record.ByteSizeLong() + GetIssuesByteSize(Issues) - + Owner.Size() + GetDebugInfoByteSize(DebugInfo); } - size_t TasksByteSizeLong() const { - size_t size = 0; - for (const auto& task: Tasks) { - size += task.GetByteSize(); - } - size += Tasks.size() * sizeof(TTask); - return size; - } - + Yq::Private::GetTaskResult Record; NYql::TIssues Issues; - TVector<TTask> Tasks; - TString Owner; TDebugInfoPtr DebugInfo; }; diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp index 8742d91df52..c1f5f6f3ee0 100644 --- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp @@ -298,9 +298,8 @@ private: void Handle(TEvControlPlaneStorage::TEvGetTaskRequest::TPtr& ev) { CPS_LOG_I("GetTaskRequest"); - TVector<TEvControlPlaneStorage::TTask> tasks; - TString owner; - auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskResponse>(tasks, owner); + Yq::Private::GetTaskResult result; + auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskResponse>(result); NActors::TActivationContext::ActorSystem()->Send(new IEventHandle(ev->Sender, SelfId(), event.release(), 0, ev->Cookie)); } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp index 1d64d9ed589..d32959b6e9d 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp @@ -2,11 +2,11 @@ namespace NYq { -void TResponseTasks::AddTaskNonBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task) { +void TResponseTasks::AddTaskNonBlocking(const TString& key, const TTask& task) { Tasks[key] = task; } -void TResponseTasks::AddTaskBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task) { +void TResponseTasks::AddTaskBlocking(const TString& key, const TTask& task) { with_lock (Mutex) { Tasks[key] = task; } @@ -34,11 +34,11 @@ bool TResponseTasks::EmptyBlocking() { } } -const THashMap<TString, TEvControlPlaneStorage::TTask>& TResponseTasks::GetTasksNonBlocking() { +const THashMap<TString, TTask>& TResponseTasks::GetTasksNonBlocking() { return Tasks; } -const THashMap<TString, TEvControlPlaneStorage::TTask>& TResponseTasks::GetTasksBlocking() { +const THashMap<TString, TTask>& TResponseTasks::GetTasksBlocking() { with_lock (Mutex) { return Tasks; } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h index 1ab660d43dd..29580cb2d0c 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h +++ b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h @@ -5,11 +5,20 @@ namespace NYq { +struct TTask { + TString Scope; + TString QueryId; + YandexQuery::Query Query; + YandexQuery::Internal::QueryInternal Internal; + ui64 Generation = 0; + TInstant Deadline; +}; + class TResponseTasks { public: - void AddTaskNonBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task); + void AddTaskNonBlocking(const TString& key, const TTask& task); - void AddTaskBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task); + void AddTaskBlocking(const TString& key, const TTask& task); void SafeEraseTaskNonBlocking(const TString& key); @@ -19,13 +28,13 @@ public: bool EmptyBlocking(); - const THashMap<TString, TEvControlPlaneStorage::TTask>& GetTasksNonBlocking(); + const THashMap<TString, TTask>& GetTasksNonBlocking(); - const THashMap<TString, TEvControlPlaneStorage::TTask>& GetTasksBlocking(); + const THashMap<TString, TTask>& GetTasksBlocking(); private: TMutex Mutex; - THashMap<TString, TEvControlPlaneStorage::TTask> Tasks; + THashMap<TString, TTask> Tasks; }; } //NYq diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index e77407584c8..f9bf0e8035b 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -12,7 +12,7 @@ namespace NYq { namespace { struct TTaskInternal { - TEvControlPlaneStorage::TTask Task; + TTask Task; TRetryLimiter RetryLimiter; bool ShouldAbortTask = false; TString TablePathPrefix; @@ -23,6 +23,36 @@ struct TTaskInternal { TString TenantName; }; + TString GetServiceAccountId(const YandexQuery::IamAuth& auth) { + return auth.has_service_account() + ? auth.service_account().id() + : TString{}; + } + + TString ExtractServiceAccountId(const YandexQuery::Connection& c) { + switch (c.content().setting().connection_case()) { + case YandexQuery::ConnectionSetting::kYdbDatabase: { + return GetServiceAccountId(c.content().setting().ydb_database().auth()); + } + case YandexQuery::ConnectionSetting::kDataStreams: { + return GetServiceAccountId(c.content().setting().data_streams().auth()); + } + case YandexQuery::ConnectionSetting::kObjectStorage: { + return GetServiceAccountId(c.content().setting().object_storage().auth()); + } + case YandexQuery::ConnectionSetting::kMonitoring: { + return GetServiceAccountId(c.content().setting().monitoring().auth()); + } + case YandexQuery::ConnectionSetting::kClickhouseCluster: { + return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth()); + } + // Do not replace with default. Adding a new connection should cause a compilation error + case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: + break; + } + return {}; + } + std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) { const auto& task = taskInternal.Task; TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)"); @@ -138,10 +168,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTC_GET_TASK); requestCounters->InFly->Inc(); - TEvControlPlaneStorage::TEvGetTaskRequest& request = *ev->Get(); - const TString owner = request.Owner; - const TString hostName = request.HostName; - const TString tenantName = request.TenantName; + auto& request = ev->Get()->Request; + const TString owner = request.owner_id(); + const TString hostName = request.host(); + const TString tenantName = request.tenant(); const ui64 tasksBatchSize = Config.Proto.GetTasksBatchSize(); const ui64 numTasksProportion = Config.Proto.GetNumTasksProportion(); @@ -158,7 +188,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; - auto response = std::make_shared<std::tuple<TVector<TEvControlPlaneStorage::TTask>, TString>>(); //tasks, owner + auto response = std::make_shared<std::tuple<TVector<TTask>, TString>>(); //tasks, owner TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetTask(read stale ro)"); auto now = TInstant::Now(); @@ -271,7 +301,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ auto status = MakeFuture(TStatus{EStatus::SUCCESS, std::move(issues)}); try { future.GetValue(); - TVector<TEvControlPlaneStorage::TTask> tasks; + TVector<TTask> tasks; for (const auto& [_, task] : responseTasks->GetTasksNonBlocking()) { tasks.emplace_back(task); } @@ -284,10 +314,61 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ }); }); - auto prepare = [response] { return *response; }; - auto success = SendResponseTuple - <TEvControlPlaneStorage::TEvGetTaskResponse, - std::tuple<TVector<TEvControlPlaneStorage::TTask>, TString>> //tasks, owner + auto prepare = [response] { + + Yq::Private::GetTaskResult result; + const auto& tasks = std::get<0>(*response); + + for (const auto& task : tasks) { + const auto& queryType = task.Query.content().type(); + if (queryType != YandexQuery::QueryContent::ANALYTICS && queryType != YandexQuery::QueryContent::STREAMING) { //TODO: fix + ythrow yexception() + << "query type " + << YandexQuery::QueryContent::QueryType_Name(queryType) + << " unsupported"; + } + auto* newTask = result.add_tasks(); + newTask->set_query_type(queryType); + newTask->set_execute_mode(task.Query.meta().execute_mode()); + newTask->set_state_load_mode(task.Internal.state_load_mode()); + auto* queryId = newTask->mutable_query_id(); + queryId->set_value(task.Query.meta().common().id()); + newTask->set_streaming(queryType == YandexQuery::QueryContent::STREAMING); + newTask->set_text(task.Query.content().text()); + *newTask->mutable_connection() = task.Internal.connection(); + *newTask->mutable_binding() = task.Internal.binding(); + newTask->set_user_token(task.Internal.token()); + newTask->set_user_id(task.Query.meta().common().created_by()); + newTask->set_generation(task.Generation); + newTask->set_status(task.Query.meta().status()); + *newTask->mutable_created_topic_consumers() = task.Internal.created_topic_consumers(); + newTask->mutable_sensor_labels()->insert({"cloud_id", task.Internal.cloud_id()}); + newTask->mutable_sensor_labels()->insert({"scope", task.Scope}); + newTask->set_automatic(task.Query.content().automatic()); + newTask->set_query_name(task.Query.content().name()); + *newTask->mutable_deadline() = NProtoInterop::CastToProto(task.Deadline); + newTask->mutable_disposition()->CopyFrom(task.Internal.disposition()); + newTask->set_result_limit(task.Internal.result_limit()); + + for (const auto& connection: task.Internal.connection()) { + const auto serviceAccountId = ExtractServiceAccountId(connection); + if (!serviceAccountId) { + continue; + } + auto* account = newTask->add_service_accounts(); + account->set_value(serviceAccountId); + } + + *newTask->mutable_dq_graph() = task.Internal.dq_graph(); + newTask->set_dq_graph_index(task.Internal.dq_graph_index()); + + *newTask->mutable_result_set_meta() = task.Query.result_set_meta(); + newTask->set_scope(task.Scope); + } + + return result; + }; + auto success = SendResponse<TEvControlPlaneStorage::TEvGetTaskResponse, Yq::Private::GetTaskResult> ("GetTaskRequest", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 541677186e3..58100255215 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -976,11 +976,15 @@ class TGetTaskBuilder { public: TGetTaskBuilder() { - SetOwner("owner"); + SetOwner(DefaultOwner()); SetHostName("localhost"); SetTenantName("/root/tenant"); } + static TString DefaultOwner() { + return "owner"; + } + TGetTaskBuilder& SetOwner(const TString& owner) { Owner = owner; @@ -1001,7 +1005,11 @@ public: std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build() { - return std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(Owner, HostName, TenantName); + auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(); + request->Request.set_tenant(TenantName); + request->Request.set_owner_id(Owner); + request->Request.set_host(HostName); + return request; } }; diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp index 35849129854..d5b49fe02f9 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.cpp +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -44,6 +44,7 @@ private: hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle) hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle) + hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, Handle) ); void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) { @@ -66,13 +67,24 @@ private: } } - void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& /*ev*/) { - /* + void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& ev) { Cookie++; Senders[Cookie] = ev->Sender; auto request = ev->Get()->Request; Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie); - */ + } + + void Handle(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) { + auto it = Senders.find(ev->Cookie); + if (it != Senders.end()) { + if (ev->Get()->Issues.Size() == 0) { + Send(it->second, new TEvInternalService::TEvGetTaskResponse(ev->Get()->Record)); + } else { + auto issues = ev->Get()->Issues; + Send(it->second, new TEvInternalService::TEvGetTaskResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues))); + } + Senders.erase(it); + } } void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) { |