aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-05-20 18:32:16 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-05-20 18:32:16 +0300
commitbd647416fc1f87644c0d8391b53d22254edfdfdd (patch)
treed0ef5a4f21a0e5a7cf0208bbab893d70d3b77c13
parent92394d748cf66c288843ce740d2d37a94f67bf49 (diff)
downloadydb-bd647416fc1f87644c0d8391b53d22254edfdfdd.tar.gz
GetTask CP message is proto based + LoopbackService support
ref:09af2b300d81600704800c504d85fcd732251f9e
-rw-r--r--ydb/core/yq/libs/actors/task_get.cpp127
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h70
-rw-r--r--ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp5
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp8
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h19
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp103
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h12
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.cpp18
8 files changed, 183 insertions, 179 deletions
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp
index 6cc8bb922ab..ff5269b9a46 100644
--- a/ydb/core/yq/libs/actors/task_get.cpp
+++ b/ydb/core/yq/libs/actors/task_get.cpp
@@ -19,9 +19,9 @@
#include <ydb/library/security/util.h>
#define LOG_E(stream) \
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", "<< stream)
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", Tenant: " << Tenant << ", " << stream)
#define LOG_D(stream) \
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", " << stream)
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PRIVATE_PROXY, "PrivateGetTask - Owner: " << OwnerId << ", " << "Host: " << Host << ", Tenant: " << Tenant << ", " << stream)
namespace NYq {
@@ -56,10 +56,10 @@ public:
void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev, const NActors::TActorContext& ctx) {
LOG_E("TGetTaskRequestActor::OnUndelivered");
- auto Res = MakeHolder<TEvents::TEvGetTaskResponse>();
- Res->Status = Ydb::StatusIds::GENERIC_ERROR;
- Res->Issues.AddIssue("UNDELIVERED");
- ctx.Send(ev->Sender, Res.Release());
+ auto response = MakeHolder<TEvents::TEvGetTaskResponse>();
+ response->Status = Ydb::StatusIds::GENERIC_ERROR;
+ response->Issues.AddIssue("UNDELIVERED");
+ ctx.Send(ev->Sender, response.Release());
Die(ctx);
}
@@ -74,61 +74,29 @@ public:
LOG_E(TStringBuilder()
<< "Failed with code: " << codeStr
<< " Details: " << Issues.ToString());
- auto Res = MakeHolder<TEvents::TEvGetTaskResponse>();
- Res->Status = reqStatus;
- Res->Issues.AddIssues(Issues);
- Send(Sender, Res.Release());
+ auto response = MakeHolder<TEvents::TEvGetTaskResponse>();
+ response->Status = reqStatus;
+ response->Issues.AddIssues(Issues);
+ Send(Sender, response.Release());
PassAway();
}
void Bootstrap(const TActorContext& ctx) {
Become(&TGetTaskRequestActor::StateFunc);
- const auto& req = Ev->Record;
- OwnerId = req.owner_id();
- Host = req.host();
- Tenant = req.tenant();
- LOG_D("Request CP::GetTask with size: " << req.ByteSize() << " bytes");
- RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024);
+ auto request = Ev->Record;
+ LOG_D("Request CP::GetTask with size: " << request.ByteSize() << " bytes");
+ RequestedMBytes->Collect(request.ByteSize() / 1024 / 1024);
+ OwnerId = request.owner_id();
+ Host = request.host();
+ Tenant = request.tenant();
ctx.Send(NYq::ControlPlaneStorageServiceActorId(),
- new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(OwnerId, Host, Tenant));
- }
-
- static TString GetServiceAccountId(const YandexQuery::IamAuth& auth) {
- return auth.has_service_account()
- ? auth.service_account().id()
- : TString{};
- }
-
- static TString ExtractServiceAccountId(const YandexQuery::Connection& c) {
- switch (c.content().setting().connection_case()) {
- case YandexQuery::ConnectionSetting::kYdbDatabase: {
- return GetServiceAccountId(c.content().setting().ydb_database().auth());
- }
- case YandexQuery::ConnectionSetting::kDataStreams: {
- return GetServiceAccountId(c.content().setting().data_streams().auth());
- }
- case YandexQuery::ConnectionSetting::kObjectStorage: {
- return GetServiceAccountId(c.content().setting().object_storage().auth());
- }
- case YandexQuery::ConnectionSetting::kMonitoring: {
- return GetServiceAccountId(c.content().setting().monitoring().auth());
- }
- case YandexQuery::ConnectionSetting::kClickhouseCluster: {
- return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth());
- }
- // Do not replace with default. Adding a new connection should cause a compilation error
- case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET:
- break;
- }
- return {};
+ new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)));
}
private:
void HandleResponse(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev, const TActorContext& ctx) { // YQ
LOG_D("Got CP::GetTask Response");
- const auto& tasks = ev->Get()->Tasks;
- Res->Record.ConstructInPlace();
- Res->Status = Ydb::StatusIds::SUCCESS;
+
const auto& issues = ev->Get()->Issues;
if (issues) {
Issues.AddIssues(issues);
@@ -136,61 +104,24 @@ private:
return;
}
+ auto response = MakeHolder<TEvents::TEvGetTaskResponse>();
+ response->Status = Ydb::StatusIds::SUCCESS;
+ response->Record.ConstructInPlace();
+ auto& record = *response->Record;
+ record = ev->Get()->Record;
try {
- for (const auto& task : tasks) {
- const auto& queryType = task.Query.content().type();
- if (queryType != YandexQuery::QueryContent::ANALYTICS && queryType != YandexQuery::QueryContent::STREAMING) { //TODO: fix
- ythrow yexception()
- << "query type "
- << YandexQuery::QueryContent::QueryType_Name(queryType)
- << " unsupported";
- }
- auto* newTask = Res->Record->add_tasks();
- newTask->set_query_type(queryType);
- newTask->set_execute_mode(task.Query.meta().execute_mode());
- newTask->set_state_load_mode(task.Internal.state_load_mode());
- auto* queryId = newTask->mutable_query_id();
- queryId->set_value(task.Query.meta().common().id());
- newTask->set_streaming(queryType == YandexQuery::QueryContent::STREAMING);
- newTask->set_text(task.Query.content().text());
- *newTask->mutable_connection() = task.Internal.connection();
- *newTask->mutable_binding() = task.Internal.binding();
- newTask->set_user_token(task.Internal.token());
- newTask->set_user_id(task.Query.meta().common().created_by());
- newTask->set_generation(task.Generation);
- newTask->set_status(task.Query.meta().status());
- *newTask->mutable_created_topic_consumers() = task.Internal.created_topic_consumers();
- newTask->mutable_sensor_labels()->insert({"cloud_id", task.Internal.cloud_id()});
- newTask->mutable_sensor_labels()->insert({"scope", task.Scope});
- newTask->set_automatic(task.Query.content().automatic());
- newTask->set_query_name(task.Query.content().name());
- *newTask->mutable_deadline() = NProtoInterop::CastToProto(task.Deadline);
- newTask->mutable_disposition()->CopyFrom(task.Internal.disposition());
- newTask->set_result_limit(task.Internal.result_limit());
-
+ for (auto& task : *record.mutable_tasks()) {
THashMap<TString, TString> accountIdSignatures;
- for (const auto& connection: task.Internal.connection()) {
- const auto serviceAccountId = ExtractServiceAccountId(connection);
- if (!serviceAccountId) {
- continue;
- }
-
+ for (auto& account : *task.mutable_service_accounts()) {
+ const auto serviceAccountId = account.value();
auto& signature = accountIdSignatures[serviceAccountId];
if (!signature && Signer) {
signature = Signer->SignAccountId(serviceAccountId);
}
- auto* account = newTask->add_service_accounts();
- account->set_value(serviceAccountId);
- account->set_signature(signature);
+ account.set_signature(signature);
}
-
- *newTask->mutable_dq_graph() = task.Internal.dq_graph();
- newTask->set_dq_graph_index(task.Internal.dq_graph_index());
-
- *newTask->mutable_result_set_meta() = task.Query.result_set_meta();
- newTask->set_scope(task.Scope);
}
- ctx.Send(Sender, Res.Release());
+ ctx.Send(Sender, response.Release());
Die(ctx);
} catch (...) {
const auto msg = TStringBuilder() << "Can't do GetTask: " << CurrentExceptionMessage();
@@ -221,8 +152,6 @@ private:
TString OwnerId;
TString Host;
TString Tenant;
-
- THolder<TEvents::TEvGetTaskResponse> Res = MakeHolder<TEvents::TEvGetTaskResponse>();
};
IActor* CreateGetTaskRequestActor(
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index 4566e1239cf..ec89313c029 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -383,77 +383,43 @@ struct TEvControlPlaneStorage {
};
struct TEvGetTaskRequest : NActors::TEventLocal<TEvGetTaskRequest, EvGetTaskRequest> {
- explicit TEvGetTaskRequest(
- const TString& owner,
- const TString& hostName,
- const TString& tenantName)
- : Owner(owner)
- , HostName(hostName)
- , TenantName(tenantName)
- {
- }
- size_t GetByteSize() const {
- return sizeof(*this)
- + Owner.Size()
- + HostName.Size()
- + TenantName.Size();
- }
-
- TString Owner;
- TString HostName;
- TString TenantName;
- };
+ TEvGetTaskRequest() = default;
- struct TTask {
- TString Scope;
- TString QueryId;
- YandexQuery::Query Query;
- YandexQuery::Internal::QueryInternal Internal;
- ui64 Generation = 0;
- TInstant Deadline;
+ explicit TEvGetTaskRequest(
+ Yq::Private::GetTaskRequest&& request)
+ : Request(std::move(request))
+ {}
size_t GetByteSize() const {
return sizeof(*this)
- + Scope.Size()
- + QueryId.Size()
- + Query.ByteSizeLong()
- + Internal.ByteSizeLong();
+ + Request.ByteSizeLong();
}
+
+ Yq::Private::GetTaskRequest Request;
};
struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, EvGetTaskResponse> {
- explicit TEvGetTaskResponse(const TVector<TTask>& tasks, const TString& owner)
- : Tasks(tasks)
- , Owner(owner)
- {
- }
- explicit TEvGetTaskResponse(const NYql::TIssues& issues)
+ explicit TEvGetTaskResponse(
+ const Yq::Private::GetTaskResult& record)
+ : Record(record)
+ {}
+
+ explicit TEvGetTaskResponse(
+ const NYql::TIssues& issues)
: Issues(issues)
- {
- }
+ {}
size_t GetByteSize() const {
return sizeof(*this)
- + TasksByteSizeLong()
+ + Record.ByteSizeLong()
+ GetIssuesByteSize(Issues)
- + Owner.Size()
+ GetDebugInfoByteSize(DebugInfo);
}
- size_t TasksByteSizeLong() const {
- size_t size = 0;
- for (const auto& task: Tasks) {
- size += task.GetByteSize();
- }
- size += Tasks.size() * sizeof(TTask);
- return size;
- }
-
+ Yq::Private::GetTaskResult Record;
NYql::TIssues Issues;
- TVector<TTask> Tasks;
- TString Owner;
TDebugInfoPtr DebugInfo;
};
diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
index 8742d91df52..c1f5f6f3ee0 100644
--- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
@@ -298,9 +298,8 @@ private:
void Handle(TEvControlPlaneStorage::TEvGetTaskRequest::TPtr& ev)
{
CPS_LOG_I("GetTaskRequest");
- TVector<TEvControlPlaneStorage::TTask> tasks;
- TString owner;
- auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskResponse>(tasks, owner);
+ Yq::Private::GetTaskResult result;
+ auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskResponse>(result);
NActors::TActivationContext::ActorSystem()->Send(new IEventHandle(ev->Sender, SelfId(), event.release(), 0, ev->Cookie));
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp
index 1d64d9ed589..d32959b6e9d 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.cpp
@@ -2,11 +2,11 @@
namespace NYq {
-void TResponseTasks::AddTaskNonBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task) {
+void TResponseTasks::AddTaskNonBlocking(const TString& key, const TTask& task) {
Tasks[key] = task;
}
-void TResponseTasks::AddTaskBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task) {
+void TResponseTasks::AddTaskBlocking(const TString& key, const TTask& task) {
with_lock (Mutex) {
Tasks[key] = task;
}
@@ -34,11 +34,11 @@ bool TResponseTasks::EmptyBlocking() {
}
}
-const THashMap<TString, TEvControlPlaneStorage::TTask>& TResponseTasks::GetTasksNonBlocking() {
+const THashMap<TString, TTask>& TResponseTasks::GetTasksNonBlocking() {
return Tasks;
}
-const THashMap<TString, TEvControlPlaneStorage::TTask>& TResponseTasks::GetTasksBlocking() {
+const THashMap<TString, TTask>& TResponseTasks::GetTasksBlocking() {
with_lock (Mutex) {
return Tasks;
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h
index 1ab660d43dd..29580cb2d0c 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h
+++ b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h
@@ -5,11 +5,20 @@
namespace NYq {
+struct TTask {
+ TString Scope;
+ TString QueryId;
+ YandexQuery::Query Query;
+ YandexQuery::Internal::QueryInternal Internal;
+ ui64 Generation = 0;
+ TInstant Deadline;
+};
+
class TResponseTasks {
public:
- void AddTaskNonBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task);
+ void AddTaskNonBlocking(const TString& key, const TTask& task);
- void AddTaskBlocking(const TString& key, const TEvControlPlaneStorage::TTask& task);
+ void AddTaskBlocking(const TString& key, const TTask& task);
void SafeEraseTaskNonBlocking(const TString& key);
@@ -19,13 +28,13 @@ public:
bool EmptyBlocking();
- const THashMap<TString, TEvControlPlaneStorage::TTask>& GetTasksNonBlocking();
+ const THashMap<TString, TTask>& GetTasksNonBlocking();
- const THashMap<TString, TEvControlPlaneStorage::TTask>& GetTasksBlocking();
+ const THashMap<TString, TTask>& GetTasksBlocking();
private:
TMutex Mutex;
- THashMap<TString, TEvControlPlaneStorage::TTask> Tasks;
+ THashMap<TString, TTask> Tasks;
};
} //NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index e77407584c8..f9bf0e8035b 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -12,7 +12,7 @@ namespace NYq {
namespace {
struct TTaskInternal {
- TEvControlPlaneStorage::TTask Task;
+ TTask Task;
TRetryLimiter RetryLimiter;
bool ShouldAbortTask = false;
TString TablePathPrefix;
@@ -23,6 +23,36 @@ struct TTaskInternal {
TString TenantName;
};
+ TString GetServiceAccountId(const YandexQuery::IamAuth& auth) {
+ return auth.has_service_account()
+ ? auth.service_account().id()
+ : TString{};
+ }
+
+ TString ExtractServiceAccountId(const YandexQuery::Connection& c) {
+ switch (c.content().setting().connection_case()) {
+ case YandexQuery::ConnectionSetting::kYdbDatabase: {
+ return GetServiceAccountId(c.content().setting().ydb_database().auth());
+ }
+ case YandexQuery::ConnectionSetting::kDataStreams: {
+ return GetServiceAccountId(c.content().setting().data_streams().auth());
+ }
+ case YandexQuery::ConnectionSetting::kObjectStorage: {
+ return GetServiceAccountId(c.content().setting().object_storage().auth());
+ }
+ case YandexQuery::ConnectionSetting::kMonitoring: {
+ return GetServiceAccountId(c.content().setting().monitoring().auth());
+ }
+ case YandexQuery::ConnectionSetting::kClickhouseCluster: {
+ return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth());
+ }
+ // Do not replace with default. Adding a new connection should cause a compilation error
+ case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ break;
+ }
+ return {};
+ }
+
std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) {
const auto& task = taskInternal.Task;
TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)");
@@ -138,10 +168,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
TRequestCountersPtr requestCounters = Counters.GetCommonCounters(RTC_GET_TASK);
requestCounters->InFly->Inc();
- TEvControlPlaneStorage::TEvGetTaskRequest& request = *ev->Get();
- const TString owner = request.Owner;
- const TString hostName = request.HostName;
- const TString tenantName = request.TenantName;
+ auto& request = ev->Get()->Request;
+ const TString owner = request.owner_id();
+ const TString hostName = request.host();
+ const TString tenantName = request.tenant();
const ui64 tasksBatchSize = Config.Proto.GetTasksBatchSize();
const ui64 numTasksProportion = Config.Proto.GetNumTasksProportion();
@@ -158,7 +188,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
- auto response = std::make_shared<std::tuple<TVector<TEvControlPlaneStorage::TTask>, TString>>(); //tasks, owner
+ auto response = std::make_shared<std::tuple<TVector<TTask>, TString>>(); //tasks, owner
TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetTask(read stale ro)");
auto now = TInstant::Now();
@@ -271,7 +301,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
auto status = MakeFuture(TStatus{EStatus::SUCCESS, std::move(issues)});
try {
future.GetValue();
- TVector<TEvControlPlaneStorage::TTask> tasks;
+ TVector<TTask> tasks;
for (const auto& [_, task] : responseTasks->GetTasksNonBlocking()) {
tasks.emplace_back(task);
}
@@ -284,10 +314,61 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
});
});
- auto prepare = [response] { return *response; };
- auto success = SendResponseTuple
- <TEvControlPlaneStorage::TEvGetTaskResponse,
- std::tuple<TVector<TEvControlPlaneStorage::TTask>, TString>> //tasks, owner
+ auto prepare = [response] {
+
+ Yq::Private::GetTaskResult result;
+ const auto& tasks = std::get<0>(*response);
+
+ for (const auto& task : tasks) {
+ const auto& queryType = task.Query.content().type();
+ if (queryType != YandexQuery::QueryContent::ANALYTICS && queryType != YandexQuery::QueryContent::STREAMING) { //TODO: fix
+ ythrow yexception()
+ << "query type "
+ << YandexQuery::QueryContent::QueryType_Name(queryType)
+ << " unsupported";
+ }
+ auto* newTask = result.add_tasks();
+ newTask->set_query_type(queryType);
+ newTask->set_execute_mode(task.Query.meta().execute_mode());
+ newTask->set_state_load_mode(task.Internal.state_load_mode());
+ auto* queryId = newTask->mutable_query_id();
+ queryId->set_value(task.Query.meta().common().id());
+ newTask->set_streaming(queryType == YandexQuery::QueryContent::STREAMING);
+ newTask->set_text(task.Query.content().text());
+ *newTask->mutable_connection() = task.Internal.connection();
+ *newTask->mutable_binding() = task.Internal.binding();
+ newTask->set_user_token(task.Internal.token());
+ newTask->set_user_id(task.Query.meta().common().created_by());
+ newTask->set_generation(task.Generation);
+ newTask->set_status(task.Query.meta().status());
+ *newTask->mutable_created_topic_consumers() = task.Internal.created_topic_consumers();
+ newTask->mutable_sensor_labels()->insert({"cloud_id", task.Internal.cloud_id()});
+ newTask->mutable_sensor_labels()->insert({"scope", task.Scope});
+ newTask->set_automatic(task.Query.content().automatic());
+ newTask->set_query_name(task.Query.content().name());
+ *newTask->mutable_deadline() = NProtoInterop::CastToProto(task.Deadline);
+ newTask->mutable_disposition()->CopyFrom(task.Internal.disposition());
+ newTask->set_result_limit(task.Internal.result_limit());
+
+ for (const auto& connection: task.Internal.connection()) {
+ const auto serviceAccountId = ExtractServiceAccountId(connection);
+ if (!serviceAccountId) {
+ continue;
+ }
+ auto* account = newTask->add_service_accounts();
+ account->set_value(serviceAccountId);
+ }
+
+ *newTask->mutable_dq_graph() = task.Internal.dq_graph();
+ newTask->set_dq_graph_index(task.Internal.dq_graph_index());
+
+ *newTask->mutable_result_set_meta() = task.Query.result_set_meta();
+ newTask->set_scope(task.Scope);
+ }
+
+ return result;
+ };
+ auto success = SendResponse<TEvControlPlaneStorage::TEvGetTaskResponse, Yq::Private::GetTaskResult>
("GetTaskRequest",
NActors::TActivationContext::ActorSystem(),
result,
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index 541677186e3..58100255215 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -976,11 +976,15 @@ class TGetTaskBuilder {
public:
TGetTaskBuilder()
{
- SetOwner("owner");
+ SetOwner(DefaultOwner());
SetHostName("localhost");
SetTenantName("/root/tenant");
}
+ static TString DefaultOwner() {
+ return "owner";
+ }
+
TGetTaskBuilder& SetOwner(const TString& owner)
{
Owner = owner;
@@ -1001,7 +1005,11 @@ public:
std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build()
{
- return std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(Owner, HostName, TenantName);
+ auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>();
+ request->Request.set_tenant(TenantName);
+ request->Request.set_owner_id(Owner);
+ request->Request.set_host(HostName);
+ return request;
}
};
diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp
index 35849129854..d5b49fe02f9 100644
--- a/ydb/core/yq/libs/private_client/loopback_service.cpp
+++ b/ydb/core/yq/libs/private_client/loopback_service.cpp
@@ -44,6 +44,7 @@ private:
hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle)
hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
+ hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, Handle)
);
void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
@@ -66,13 +67,24 @@ private:
}
}
- void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& /*ev*/) {
- /*
+ void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& ev) {
Cookie++;
Senders[Cookie] = ev->Sender;
auto request = ev->Get()->Request;
Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie);
- */
+ }
+
+ void Handle(NYq::TEvControlPlaneStorage::TEvGetTaskResponse::TPtr& ev) {
+ auto it = Senders.find(ev->Cookie);
+ if (it != Senders.end()) {
+ if (ev->Get()->Issues.Size() == 0) {
+ Send(it->second, new TEvInternalService::TEvGetTaskResponse(ev->Get()->Record));
+ } else {
+ auto issues = ev->Get()->Issues;
+ Send(it->second, new TEvInternalService::TEvGetTaskResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues)));
+ }
+ Senders.erase(it);
+ }
}
void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) {