summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <[email protected]>2022-05-22 23:43:54 +0300
committerAleksandr Khoroshilov <[email protected]>2022-05-22 23:43:54 +0300
commit2d1f7aba6bff54b08e4528aa26cc5f523a1b373c (patch)
tree691c68a17d09223b03fa30d1f017dccb18457438
parent78d0a148cad3d1a88b54367002924ada307209da (diff)
PingTask CP message is proto based + LoopbackService support
ref:d6c05ce50c167504800a671cf539f335e1ace15f
-rw-r--r--README.md4
-rw-r--r--ydb/core/yq/libs/actors/task_ping.cpp107
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h108
-rw-r--r--ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp204
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h72
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.cpp36
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.h2
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.cpp20
9 files changed, 219 insertions, 336 deletions
diff --git a/README.md b/README.md
index 11b59701fde..2b75455c4aa 100644
--- a/README.md
+++ b/README.md
@@ -80,9 +80,11 @@ For development purposes we test that YDB could be built and run under latest ve
We are glad to welcome new contributors!
1. Please read [contributor's guide](CONTRIBUTING.md).
-2. We can accept your work to YDB after you have signed contributor's license agreement (aka CLA).
+2. We can accept your work to YDB after you have read contributor's license agreement (aka CLA).
3. Please don't forget to add a note to your pull request, that you agree to the terms of the CLA.
+More information can be found in [CONTRIBUTING](CONTRIBUTING) file.
+
## Success Stories
Take a look at YDB [web site](https://ydb.tech/) for the latest success stories and user scenarios.
diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp
index 43193914ae8..04144e79c83 100644
--- a/ydb/core/yq/libs/actors/task_ping.cpp
+++ b/ydb/core/yq/libs/actors/task_ping.cpp
@@ -101,101 +101,8 @@ private:
)
std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() {
- auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, OperationId, OwnerId, Deadline);
- const auto& req = Ev->Record;
- ui64 issuesByteSize = 0;
- ui64 transientIssuesByteSize = 0;
- ui64 resultSetMetaByteSize = 0;
- ui64 dqGraphBytesSize = 0;
-
- //TODO use all fields
- if (req.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
- event->Status = req.status();
- }
- if (!req.issues().empty()) {
- NYql::TIssues reqIssues;
- for (const auto& issue : req.issues()) {
- issuesByteSize += issue.ByteSize();
- }
- NYql::IssuesFromMessage(req.issues(), reqIssues);
- Issues.AddIssues(reqIssues);
- event->Issues = Issues;
- }
- if (!req.transient_issues().empty()) {
- NYql::TIssues transientIssues;
- for (const auto& issue : req.transient_issues()) {
- transientIssuesByteSize += issue.ByteSize();
- }
- NYql::IssuesFromMessage(req.transient_issues(), transientIssues);
- event->TransientIssues = transientIssues;
- }
- if (req.statistics()) {
- event->Statistics = req.statistics();
- }
- if (req.ast()) {
- event->Ast = req.ast();
- }
- if (req.result_id().value()) {
- event->ResultId = req.result_id().value();
- }
- if (req.plan()) {
- event->Plan = req.plan();
- }
- if (!req.result_set_meta().empty()) {
- for (const auto& rsMeta : req.result_set_meta()) {
- resultSetMetaByteSize += rsMeta.ByteSize();
- }
- event->ResultSetMetas = {req.result_set_meta().begin(), req.result_set_meta().end()};
- }
- if (req.has_started_at()) {
- event->StartedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.started_at()));
- }
- if (req.has_finished_at()) {
- event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at()));
- }
- event->ResignQuery = req.resign_query();
- event->StatusCode = req.status_code();
-
- event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size());
- for (const auto& topicConsumerProto : req.created_topic_consumers()) {
- auto& topicConsumer = event->CreatedTopicConsumers.emplace_back();
- topicConsumer.DatabaseId = topicConsumerProto.database_id();
- topicConsumer.Database = topicConsumerProto.database();
- topicConsumer.TopicPath = topicConsumerProto.topic_path();
- topicConsumer.ConsumerName = topicConsumerProto.consumer_name();
- topicConsumer.ClusterEndpoint = topicConsumerProto.cluster_endpoint();
- topicConsumer.UseSsl = topicConsumerProto.use_ssl();
- topicConsumer.TokenName = topicConsumerProto.token_name();
- topicConsumer.AddBearerToToken = topicConsumerProto.add_bearer_to_token();
- }
-
- event->DqGraphs.reserve(req.dq_graph_size());
- for (const auto& g : req.dq_graph()) {
- dqGraphBytesSize += g.size();
- event->DqGraphs.emplace_back(g);
- }
-
- if (req.state_load_mode()) {
- event->StateLoadMode = req.state_load_mode();
- }
-
- if (req.has_disposition()) {
- event->StreamingDisposition = req.disposition();
- }
-
-
- LOG_D("Statistics length: " << req.statistics().size() << ", "
- << "Ast length: " << req.ast().size() << " bytes, "
- << "Plan length: " << req.plan().size() << " bytes, "
- << "Result set meta size: " << resultSetMetaByteSize << " bytes, "
- << "Topic consumers size: " << event->CreatedTopicConsumers.size() * sizeof(TEvControlPlaneStorage::TTopicConsumer) << " bytes, "
- << "Dq graphs size: " << dqGraphBytesSize << " bytes, "
- << "Issues size: " << issuesByteSize << " bytes, "
- << "Transient issues size: " << transientIssuesByteSize << " bytes");
-
- event->DqGraphIndex = req.dq_graph_index();
-
- return std::move(event);
+ auto request = Ev->Record;
+ return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
}
void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev, const TActorContext& ctx) {
@@ -206,12 +113,10 @@ private:
Fail("ControlPlane PingTaskError", Ydb::StatusIds::GENERIC_ERROR);
return;
}
- auto res = MakeHolder<TEvents::TEvPingTaskResponse>();
- res->Status = Ydb::StatusIds::SUCCESS;
- Yq::Private::PingTaskResult result;
- result.set_action(ev->Get()->Action);
- res->Record.ConstructInPlace(result);
- ctx.Send(Sender, res.Release());
+ auto response = MakeHolder<TEvents::TEvPingTaskResponse>();
+ response->Status = Ydb::StatusIds::SUCCESS;
+ response->Record.ConstructInPlace(ev->Get()->Record);
+ ctx.Send(Sender, response.Release());
Die(ctx);
}
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index ec89313c029..0257143f323 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -446,112 +446,42 @@ struct TEvControlPlaneStorage {
};
struct TEvPingTaskRequest : NActors::TEventLocal<TEvPingTaskRequest, EvPingTaskRequest> {
- explicit TEvPingTaskRequest(const TString& tenantName, const TString& cloudId, const TString& scope, const TString& queryId, const TString& owner, const TInstant& deadline, const TString& resultId = "")
- : TenantName(tenantName)
- , CloudId(cloudId)
- , Scope(scope)
- , QueryId(queryId)
- , Owner(owner)
- , Deadline(deadline)
- , ResultId(resultId)
- {
- }
- size_t GetByteSize() const {
- return sizeof(*this)
- + TenantName.Size()
- + CloudId.Size()
- + Scope.Size()
- + QueryId.Size()
- + Owner.Size()
- + ResultId.Size()
- + Status.Empty() ? 0 : sizeof(*Status)
- + GetIssuesByteSize(Issues)
- + GetIssuesByteSize(TransientIssues)
- + Statistics.Empty() ? 0 : Statistics->Size()
- + ResultSetMetasByteSizeLong()
- + Ast.Empty() ? 0 : Ast->Size()
- + Plan.Empty() ? 0 : Plan->Size()
- + StartedAt.Empty() ? 0 : sizeof(*StartedAt)
- + FinishedAt.Empty() ? 0 : sizeof(*FinishedAt)
- + CreatedTopicConsumersByteSizeLong()
- + DqGraphByteSizeLong()
- + StreamingDisposition.Empty() ? 0 : StreamingDisposition->ByteSizeLong();
- }
-
- size_t ResultSetMetasByteSizeLong() const {
- if (ResultSetMetas.Empty()) {
- return 0;
- }
- size_t size = 0;
- for (const auto& resultSet: *ResultSetMetas) {
- size += resultSet.ByteSizeLong();
- }
- size += ResultSetMetas->size() * sizeof(YandexQuery::ResultSetMeta);
- return size;
- }
+ TEvPingTaskRequest() = default;
- size_t CreatedTopicConsumersByteSizeLong() const {
- size_t size = 0;
- for (const auto& topic: CreatedTopicConsumers) {
- size += topic.GetByteSize();
- }
- size += CreatedTopicConsumers.size() * sizeof(YandexQuery::ResultSetMeta);
- return size;
- }
+ explicit TEvPingTaskRequest(
+ Yq::Private::PingTaskRequest&& request)
+ : Request(std::move(request))
+ {}
- size_t DqGraphByteSizeLong() const {
- size_t size = 0;
- for (const auto& graph: DqGraphs) {
- size += graph.Size();
- }
- size += DqGraphs.size() * sizeof(TString);
- return size;
+ size_t GetByteSize() const {
+ return sizeof(*this)
+ + Request.ByteSizeLong();
}
- const TString TenantName;
- const TString CloudId;
- const TString Scope;
- const TString QueryId;
- const TString Owner;
- const TInstant Deadline;
- TString ResultId;
- TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status;
- TMaybe<NYql::TIssues> Issues;
- TMaybe<NYql::TIssues> TransientIssues;
- TMaybe<TString> Statistics;
- TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas;
- TMaybe<TString> Ast;
- TMaybe<TString> Plan;
- TMaybe<TInstant> StartedAt;
- TMaybe<TInstant> FinishedAt;
- bool ResignQuery = false;
- ui64 StatusCode = 0;
- TVector<TTopicConsumer> CreatedTopicConsumers;
- TVector<TString> DqGraphs;
- i32 DqGraphIndex = 0;
- YandexQuery::StateLoadMode StateLoadMode = YandexQuery::STATE_LOAD_MODE_UNSPECIFIED;
- TMaybe<YandexQuery::StreamingDisposition> StreamingDisposition;
+ Yq::Private::PingTaskRequest Request;
};
struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> {
- explicit TEvPingTaskResponse(const YandexQuery::QueryAction& action)
- : Action(action)
- {
- }
- explicit TEvPingTaskResponse(const NYql::TIssues& issues)
+ explicit TEvPingTaskResponse(
+ const Yq::Private::PingTaskResult& record)
+ : Record(record)
+ {}
+
+ explicit TEvPingTaskResponse(
+ const NYql::TIssues& issues)
: Issues(issues)
- {
- }
+ {}
size_t GetByteSize() const {
return sizeof(*this)
+ + Record.ByteSizeLong()
+ GetIssuesByteSize(Issues)
+ GetDebugInfoByteSize(DebugInfo);
}
- YandexQuery::QueryAction Action = YandexQuery::QUERY_ACTION_UNSPECIFIED;
+ Yq::Private::PingTaskResult Record;
NYql::TIssues Issues;
TDebugInfoPtr DebugInfo;
};
diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
index c1f5f6f3ee0..65d65f7bcd8 100644
--- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
@@ -307,7 +307,7 @@ private:
{
SendEmptyResponse<
TEvControlPlaneStorage::TEvPingTaskRequest::TPtr,
- YandexQuery::QueryAction,
+ Yq::Private::PingTaskResult,
TEvControlPlaneStorage::TEvPingTaskResponse>(ev, "PingTaskRequest");
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 5d33c45bc33..ed5ab92f636 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -18,13 +18,13 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) {
} // namespace
std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask(
- const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
+ const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TRetryPolicyItem>& retryPolicies) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)");
- readQueryBuilder.AddString("tenant", request->TenantName);
- readQueryBuilder.AddString("scope", request->Scope);
- readQueryBuilder.AddString("query_id", request->QueryId);
+ readQueryBuilder.AddString("tenant", request.tenant());
+ readQueryBuilder.AddString("scope", request.scope());
+ readQueryBuilder.AddString("query_id", request.query_id().value());
readQueryBuilder.AddText(
"$last_job_id = SELECT `" LAST_JOB_ID_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n"
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
@@ -53,23 +53,23 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
{
TResultSetParser parser(resultSets[0]);
if (!parser.TryNextRow()) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
}
{
TResultSetParser parser(resultSets[1]);
if (!parser.TryNextRow()) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
}
@@ -78,11 +78,11 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
{
TResultSetParser parser(resultSets[2]);
if (!parser.TryNextRow()) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
}
owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
- if (owner != request->Owner) {
- ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
+ if (owner != request.owner_id()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
}
retryLimiter.Assign(
parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0),
@@ -91,16 +91,28 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
);
}
- TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus = request->Status;
- TMaybe<NYql::TIssues> issues = request->Issues;
- TMaybe<NYql::TIssues> transientIssues = request->TransientIssues;
-
+ TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus;
+ if (request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
+ queryStatus = request.status();
+ }
+ TMaybe<NYql::TIssues> issues;
+ if (request.issues().size() > 0) {
+ NYql::TIssues requestIssues;
+ NYql::IssuesFromMessage(request.issues(), requestIssues);
+ issues = requestIssues;
+ }
+ TMaybe<NYql::TIssues> transientIssues;
+ if (request.transient_issues().size() > 0) {
+ NYql::TIssues requestTransientIssues;
+ NYql::IssuesFromMessage(request.transient_issues(), requestTransientIssues);
+ transientIssues = requestTransientIssues;
+ }
// running query us locked for lease period
TDuration backoff = taskLeaseTtl;
- if (request->ResignQuery) {
+ if (request.resign_query()) {
TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero());
- auto it = retryPolicies.find(request->StatusCode);
+ auto it = retryPolicies.find(request.status_code());
auto policyFound = it != retryPolicies.end();
if (policyFound) {
policy = it->second;
@@ -125,7 +137,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
}
}
}
- CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << request->StatusCode << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff);
+ CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << NYql::NDqProto::StatusIds_StatusCode_Name(request.status_code()) << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff);
}
if (queryStatus) {
@@ -150,42 +162,38 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
NYql::IssuesToMessage(newIssues, query.mutable_transient_issue());
}
- if (request->Statistics) {
- *query.mutable_statistics()->mutable_json() = *request->Statistics;
- *job.mutable_statistics()->mutable_json() = *request->Statistics;
+ if (request.statistics()) {
+ *query.mutable_statistics()->mutable_json() = request.statistics();
+ *job.mutable_statistics()->mutable_json() = request.statistics();
}
- if (request->ResultSetMetas) {
+ if (!request.result_set_meta().empty()) {
// we will overwrite result_set_meta's COMPLETELY
- query.clear_result_set_meta();
- job.clear_result_set_meta();
- for (const auto& resultSetMeta : *request->ResultSetMetas) {
- *query.add_result_set_meta() = resultSetMeta;
- *job.add_result_set_meta() = resultSetMeta;
- }
+ *query.mutable_result_set_meta() = request.result_set_meta();
+ *job.mutable_result_set_meta() = request.result_set_meta();
}
- if (request->Ast) {
- query.mutable_ast()->set_data(*request->Ast);
- job.mutable_ast()->set_data(*request->Ast);
+ if (request.ast()) {
+ query.mutable_ast()->set_data(request.ast());
+ job.mutable_ast()->set_data(request.ast());
}
- if (request->Plan) {
- query.mutable_plan()->set_json(*request->Plan);
- job.mutable_plan()->set_json(*request->Plan);
+ if (request.plan()) {
+ query.mutable_plan()->set_json(request.plan());
+ job.mutable_plan()->set_json(request.plan());
}
- if (request->StartedAt) {
- *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt);
- *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt);
+ if (request.has_started_at()) {
+ *query.mutable_meta()->mutable_started_at() = request.started_at();
+ *job.mutable_query_meta()->mutable_started_at() = request.started_at();
}
- if (request->FinishedAt) {
- *query.mutable_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt);
- *job.mutable_query_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt);
+ if (request.has_finished_at()) {
+ *query.mutable_meta()->mutable_finished_at() = request.finished_at();
+ *job.mutable_query_meta()->mutable_finished_at() = request.finished_at();
if (!query.meta().has_started_at()) {
- *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt);
- *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt);
+ *query.mutable_meta()->mutable_started_at() = request.finished_at();
+ *job.mutable_query_meta()->mutable_started_at() = request.finished_at();
}
}
@@ -197,43 +205,33 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
}
if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) {
- *query.mutable_meta()->mutable_result_expire_at() = NProtoInterop::CastToProto(request->Deadline);
+ *query.mutable_meta()->mutable_result_expire_at() = request.deadline();
}
- if (request->StateLoadMode) {
- internal.set_state_load_mode(request->StateLoadMode);
- if (request->StateLoadMode == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint
+ if (request.state_load_mode()) {
+ internal.set_state_load_mode(request.state_load_mode());
+ if (request.state_load_mode() == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint
query.mutable_meta()->set_has_saved_checkpoints(true);
}
}
- if (request->StreamingDisposition) {
- internal.mutable_disposition()->CopyFrom(*request->StreamingDisposition);
+ if (request.has_disposition()) {
+ *internal.mutable_disposition() = request.disposition();
}
- if (request->Status && IsFinishedStatus(*request->Status)) {
+ if (request.status() && IsFinishedStatus(request.status())) {
internal.clear_created_topic_consumers();
internal.clear_dq_graph();
internal.clear_dq_graph_index();
}
- if (!request->CreatedTopicConsumers.empty()) {
+ if (!request.created_topic_consumers().empty()) {
std::set<Yq::Private::TopicConsumer, TTopicConsumerLess> mergedConsumers;
for (auto&& c : *internal.mutable_created_topic_consumers()) {
mergedConsumers.emplace(std::move(c));
}
-
- for (const auto& c : request->CreatedTopicConsumers) {
- Yq::Private::TopicConsumer proto;
- proto.set_database_id(c.DatabaseId);
- proto.set_database(c.Database);
- proto.set_topic_path(c.TopicPath);
- proto.set_consumer_name(c.ConsumerName);
- proto.set_cluster_endpoint(c.ClusterEndpoint);
- proto.set_use_ssl(c.UseSsl);
- proto.set_token_name(c.TokenName);
- proto.set_add_bearer_to_token(c.AddBearerToToken);
- mergedConsumers.emplace(std::move(proto));
+ for (const auto& c : request.created_topic_consumers()) {
+ mergedConsumers.emplace(c);
}
internal.clear_created_topic_consumers();
for (auto&& c : mergedConsumers) {
@@ -241,27 +239,24 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
}
}
- if (!request->DqGraphs.empty()) {
- internal.clear_dq_graph();
- for (const auto& g : request->DqGraphs) {
- internal.add_dq_graph(g);
- }
+ if (!request.dq_graph().empty()) {
+ *internal.mutable_dq_graph() = request.dq_graph();
}
- if (request->DqGraphIndex) {
- internal.set_dq_graph_index(request->DqGraphIndex);
+ if (request.dq_graph_index()) {
+ internal.set_dq_graph_index(request.dq_graph_index());
}
TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "HardPingTask(write)");
- writeQueryBuilder.AddString("tenant", request->TenantName);
- writeQueryBuilder.AddString("scope", request->Scope);
+ writeQueryBuilder.AddString("tenant", request.tenant());
+ writeQueryBuilder.AddString("scope", request.scope());
writeQueryBuilder.AddString("job_id", jobId);
writeQueryBuilder.AddString("job", job.SerializeAsString());
writeQueryBuilder.AddString("query", query.SerializeAsString());
writeQueryBuilder.AddInt64("status", query.meta().status());
writeQueryBuilder.AddString("internal", internal.SerializeAsString());
- writeQueryBuilder.AddString("result_id", request->ResultId);
- writeQueryBuilder.AddString("query_id", request->QueryId);
+ writeQueryBuilder.AddString("result_id", request.result_id().value());
+ writeQueryBuilder.AddString("query_id", request.query_id().value());
if (IsTerminalStatus(query.meta().status())) {
// delete pending
@@ -301,7 +296,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
TString updateResultSetsExpire;
if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) {
- writeQueryBuilder.AddTimestamp("result_sets_expire_at", request->Deadline);
+ writeQueryBuilder.AddTimestamp("result_sets_expire_at", NProtoInterop::CastFromProto(request.deadline()));
updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = $result_sets_expire_at";
} else {
updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = NULL";
@@ -321,7 +316,8 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
- *response = internal.action();
+ response->set_action(internal.action());
+
const auto writeQuery = writeQueryBuilder.Build();
return std::make_pair(writeQuery.Sql, writeQuery.Params);
};
@@ -330,12 +326,12 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
}
std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask(
- const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
+ const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& taskLeaseTtl) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)");
- readQueryBuilder.AddString("tenant", request->TenantName);
- readQueryBuilder.AddString("scope", request->Scope);
- readQueryBuilder.AddString("query_id", request->QueryId);
+ readQueryBuilder.AddString("tenant", request.tenant());
+ readQueryBuilder.AddString("scope", request.scope());
+ readQueryBuilder.AddString("query_id", request.query_id().value());
readQueryBuilder.AddText(
"SELECT `" INTERNAL_COLUMN_NAME "`\n"
"FROM `" QUERIES_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n"
@@ -354,32 +350,32 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
{
TResultSetParser parser(resultSets[0]);
if (!parser.TryNextRow()) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
}
if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
}
{
TResultSetParser parser(resultSets[1]);
if (!parser.TryNextRow()) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
}
owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
- if (owner != request->Owner) {
- ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
+ if (owner != request.owner_id()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
}
}
- *response = internal.action();
+ response->set_action(internal.action());
TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)");
writeQueryBuilder.AddTimestamp("now", TInstant::Now());
writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + taskLeaseTtl);
- writeQueryBuilder.AddString("tenant", request->TenantName);
- writeQueryBuilder.AddString("scope", request->Scope);
- writeQueryBuilder.AddString("query_id", request->QueryId);
+ writeQueryBuilder.AddString("tenant", request.tenant());
+ writeQueryBuilder.AddString("scope", request.scope());
+ writeQueryBuilder.AddString("query_id", request.query_id().value());
writeQueryBuilder.AddText(
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl\n"
@@ -396,19 +392,19 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev)
{
TInstant startTime = TInstant::Now();
- TEvControlPlaneStorage::TEvPingTaskRequest* request = ev->Get();
- const TString cloudId = request->CloudId;
- const TString scope = request->Scope;
- TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_PING_TASK);
+ Yq::Private::PingTaskRequest& request = ev->Get()->Request;
+ const TString cloudId = "";
+ const TString scope = request.scope();
+ TRequestCountersPtr requestCounters = Counters.GetScopeCounters("" /*CloudId*/, scope, RTS_PING_TASK);
requestCounters->InFly->Inc();
- requestCounters->RequestBytes->Add(request->GetByteSize());
- const TString queryId = request->QueryId;
- const TString owner = request->Owner;
- const TInstant deadline = request->Deadline;
+ requestCounters->RequestBytes->Add(ev->Get()->GetByteSize());
+ const TString queryId = request.query_id().value();
+ const TString owner = request.owner_id();
+ const TInstant deadline = NProtoInterop::CastFromProto(request.deadline());
- CPS_LOG_T("PingTaskRequest: " << request->TenantName << " " << scope << " " << queryId
+ CPS_LOG_T("PingTaskRequest: " << request.tenant() << " " << scope << " " << queryId
<< " " << owner << " " << deadline << " "
- << (request->Status ? YandexQuery::QueryMeta_ComputeStatus_Name(*request->Status) : "no status"));
+ << (request.status() ? YandexQuery::QueryMeta_ComputeStatus_Name(request.status()) : "no status"));
NYql::TIssues issues = ValidatePingTask(scope, queryId, owner, deadline, Config.ResultSetsTtl);
if (issues) {
@@ -419,10 +415,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
return;
}
- std::shared_ptr<YandexQuery::QueryAction> response = std::make_shared<YandexQuery::QueryAction>();
+ std::shared_ptr<Yq::Private::PingTaskResult> response = std::make_shared<Yq::Private::PingTaskResult>();
- if (request->Status)
- Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(*request->Status);
+ if (request.status())
+ Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(request.status());
auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl, Config.RetryPolicies) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config.TaskLeaseTtl);
@@ -432,8 +428,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo);
- auto prepare = [response] { return std::make_tuple(*response); };
- auto success = SendResponseTuple<TEvControlPlaneStorage::TEvPingTaskResponse, std::tuple<YandexQuery::QueryAction>>(
+ auto prepare = [response] { return *response; };
+ auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Yq::Private::PingTaskResult>(
"PingTaskRequest",
NActors::TActivationContext::ActorSystem(),
result,
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index 58100255215..c0f82ed566b 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -3,6 +3,7 @@
#include <util/datetime/base.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
@@ -1171,22 +1172,61 @@ public:
std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build()
{
- auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, QueryId, Owner, Deadline, ResultId);
- request->Status = Status;
- request->Issues = Issues;
- request->TransientIssues = TransientIssues;
- request->Statistics = Statistics;
- request->ResultSetMetas = ResultSetMetas;
- request->Ast = Ast;
- request->Plan = Plan;
- request->StartedAt = StartedAt;
- request->FinishedAt = FinishedAt;
- request->ResignQuery = ResignQuery;
- request->StatusCode = StatusCode;
- request->CreatedTopicConsumers = CreatedTopicConsumers;
- request->DqGraphs = DqGraphs;
- request->DqGraphIndex = DqGraphIndex;
- return request;
+ Yq::Private::PingTaskRequest request;
+ request.set_owner_id(Owner);
+ request.mutable_query_id()->set_value(QueryId);
+ request.mutable_result_id()->set_value(ResultId);
+ if (Status) {
+ request.set_status(*Status);
+ }
+ request.set_status_code(StatusCode);
+ if (Issues) {
+ NYql::IssuesToMessage(*Issues, request.mutable_issues());
+ }
+ if (TransientIssues) {
+ NYql::IssuesToMessage(*TransientIssues, request.mutable_transient_issues());
+ }
+ if (Statistics) {
+ request.set_statistics(*Statistics);
+ }
+ if (ResultSetMetas) {
+ for (const auto& meta : *ResultSetMetas) {
+ *request.add_result_set_meta() = meta;
+ }
+ }
+ for (const auto& dqGraph : DqGraphs) {
+ request.add_dq_graph(dqGraph);
+ }
+ request.set_dq_graph_index(DqGraphIndex);
+ if (Ast) {
+ request.set_ast(*Ast);
+ }
+ if (Plan) {
+ request.set_plan(*Plan);
+ }
+ request.set_resign_query(ResignQuery);
+ for (const auto& consumer : CreatedTopicConsumers) {
+ auto& cons = *request.add_created_topic_consumers();
+ cons.set_database_id(consumer.DatabaseId);
+ cons.set_database(consumer.Database);
+ cons.set_topic_path(consumer.TopicPath);
+ cons.set_consumer_name(consumer.ConsumerName);
+ cons.set_cluster_endpoint(consumer.ClusterEndpoint);
+ cons.set_use_ssl(consumer.UseSsl);
+ cons.set_token_name(consumer.TokenName);
+ cons.set_add_bearer_to_token(consumer.AddBearerToToken);
+ }
+ request.set_tenant(TenantName);
+ request.set_scope(Scope);
+ *request.mutable_deadline() = NProtoInterop::CastToProto(Deadline);
+ if (StartedAt) {
+ *request.mutable_started_at() = NProtoInterop::CastToProto(*StartedAt);
+ }
+ if (FinishedAt) {
+ *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt);
+ }
+
+ return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
}
};
diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp
index 8dd4257daf3..2ed5c6ee5e4 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/util.cpp
@@ -132,25 +132,23 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane
return config;
}
-bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request) {
- if (!request) {
- return false;
- }
- return request->Status ||
- request->Issues ||
- request->TransientIssues ||
- request->Statistics ||
- request->ResultSetMetas ||
- request->Ast ||
- request->Plan ||
- request->StartedAt ||
- request->FinishedAt ||
- request->ResignQuery ||
- !request->CreatedTopicConsumers.empty() ||
- !request->DqGraphs.empty() ||
- request->DqGraphIndex ||
- request->StateLoadMode ||
- request->StreamingDisposition;
+bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request) {
+ return request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED
+ || !request.issues().empty()
+ || !request.transient_issues().empty()
+ || !request.statistics()
+ || !request.result_set_meta().empty()
+ || request.ast()
+ || request.plan()
+ || request.has_started_at()
+ || request.has_finished_at()
+ || request.resign_query()
+ || !request.created_topic_consumers().empty()
+ || !request.dq_graph().empty()
+ || request.dq_graph_index()
+ || request.state_load_mode()
+ || request.has_disposition()
+ ;
}
NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items) {
diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h
index 9cf5dfe6561..fa721cb2bd2 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.h
+++ b/ydb/core/yq/libs/control_plane_storage/util.h
@@ -38,7 +38,7 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue);
NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config);
-bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request);
+bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request);
NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items);
diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp
index d5b49fe02f9..3f4126c4b42 100644
--- a/ydb/core/yq/libs/private_client/loopback_service.cpp
+++ b/ydb/core/yq/libs/private_client/loopback_service.cpp
@@ -43,8 +43,9 @@ private:
hFunc(TEvInternalService::TEvWriteResultRequest, Handle)
hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle)
- hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, Handle)
+ hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, Handle)
+ hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
);
void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
@@ -87,13 +88,24 @@ private:
}
}
- void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) {
- /*
+ void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& ev) {
Cookie++;
Senders[Cookie] = ev->Sender;
auto request = ev->Get()->Request;
Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie);
- */
+ }
+
+ void Handle(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) {
+ auto it = Senders.find(ev->Cookie);
+ if (it != Senders.end()) {
+ if (ev->Get()->Issues.Size() == 0) {
+ Send(it->second, new TEvInternalService::TEvPingTaskResponse(ev->Get()->Record));
+ } else {
+ auto issues = ev->Get()->Issues;
+ Send(it->second, new TEvInternalService::TEvPingTaskResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues)));
+ }
+ Senders.erase(it);
+ }
}
void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) {