diff options
author | Aleksandr Khoroshilov <[email protected]> | 2022-05-22 23:43:54 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <[email protected]> | 2022-05-22 23:43:54 +0300 |
commit | 2d1f7aba6bff54b08e4528aa26cc5f523a1b373c (patch) | |
tree | 691c68a17d09223b03fa30d1f017dccb18457438 | |
parent | 78d0a148cad3d1a88b54367002924ada307209da (diff) |
PingTask CP message is proto based + LoopbackService support
ref:d6c05ce50c167504800a671cf539f335e1ace15f
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/task_ping.cpp | 107 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/events/events.h | 108 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp | 204 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/message_builders.h | 72 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/util.cpp | 36 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/util.h | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/loopback_service.cpp | 20 |
9 files changed, 219 insertions, 336 deletions
diff --git a/README.md b/README.md index 11b59701fde..2b75455c4aa 100644 --- a/README.md +++ b/README.md @@ -80,9 +80,11 @@ For development purposes we test that YDB could be built and run under latest ve We are glad to welcome new contributors! 1. Please read [contributor's guide](CONTRIBUTING.md). -2. We can accept your work to YDB after you have signed contributor's license agreement (aka CLA). +2. We can accept your work to YDB after you have read contributor's license agreement (aka CLA). 3. Please don't forget to add a note to your pull request, that you agree to the terms of the CLA. +More information can be found in [CONTRIBUTING](CONTRIBUTING) file. + ## Success Stories Take a look at YDB [web site](https://ydb.tech/) for the latest success stories and user scenarios. diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp index 43193914ae8..04144e79c83 100644 --- a/ydb/core/yq/libs/actors/task_ping.cpp +++ b/ydb/core/yq/libs/actors/task_ping.cpp @@ -101,101 +101,8 @@ private: ) std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() { - auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, OperationId, OwnerId, Deadline); - const auto& req = Ev->Record; - ui64 issuesByteSize = 0; - ui64 transientIssuesByteSize = 0; - ui64 resultSetMetaByteSize = 0; - ui64 dqGraphBytesSize = 0; - - //TODO use all fields - if (req.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) { - event->Status = req.status(); - } - if (!req.issues().empty()) { - NYql::TIssues reqIssues; - for (const auto& issue : req.issues()) { - issuesByteSize += issue.ByteSize(); - } - NYql::IssuesFromMessage(req.issues(), reqIssues); - Issues.AddIssues(reqIssues); - event->Issues = Issues; - } - if (!req.transient_issues().empty()) { - NYql::TIssues transientIssues; - for (const auto& issue : req.transient_issues()) { - transientIssuesByteSize += issue.ByteSize(); - } - NYql::IssuesFromMessage(req.transient_issues(), transientIssues); - event->TransientIssues = transientIssues; - } - if (req.statistics()) { - event->Statistics = req.statistics(); - } - if (req.ast()) { - event->Ast = req.ast(); - } - if (req.result_id().value()) { - event->ResultId = req.result_id().value(); - } - if (req.plan()) { - event->Plan = req.plan(); - } - if (!req.result_set_meta().empty()) { - for (const auto& rsMeta : req.result_set_meta()) { - resultSetMetaByteSize += rsMeta.ByteSize(); - } - event->ResultSetMetas = {req.result_set_meta().begin(), req.result_set_meta().end()}; - } - if (req.has_started_at()) { - event->StartedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.started_at())); - } - if (req.has_finished_at()) { - event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at())); - } - event->ResignQuery = req.resign_query(); - event->StatusCode = req.status_code(); - - event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size()); - for (const auto& topicConsumerProto : req.created_topic_consumers()) { - auto& topicConsumer = event->CreatedTopicConsumers.emplace_back(); - topicConsumer.DatabaseId = topicConsumerProto.database_id(); - topicConsumer.Database = topicConsumerProto.database(); - topicConsumer.TopicPath = topicConsumerProto.topic_path(); - topicConsumer.ConsumerName = topicConsumerProto.consumer_name(); - topicConsumer.ClusterEndpoint = topicConsumerProto.cluster_endpoint(); - topicConsumer.UseSsl = topicConsumerProto.use_ssl(); - topicConsumer.TokenName = topicConsumerProto.token_name(); - topicConsumer.AddBearerToToken = topicConsumerProto.add_bearer_to_token(); - } - - event->DqGraphs.reserve(req.dq_graph_size()); - for (const auto& g : req.dq_graph()) { - dqGraphBytesSize += g.size(); - event->DqGraphs.emplace_back(g); - } - - if (req.state_load_mode()) { - event->StateLoadMode = req.state_load_mode(); - } - - if (req.has_disposition()) { - event->StreamingDisposition = req.disposition(); - } - - - LOG_D("Statistics length: " << req.statistics().size() << ", " - << "Ast length: " << req.ast().size() << " bytes, " - << "Plan length: " << req.plan().size() << " bytes, " - << "Result set meta size: " << resultSetMetaByteSize << " bytes, " - << "Topic consumers size: " << event->CreatedTopicConsumers.size() * sizeof(TEvControlPlaneStorage::TTopicConsumer) << " bytes, " - << "Dq graphs size: " << dqGraphBytesSize << " bytes, " - << "Issues size: " << issuesByteSize << " bytes, " - << "Transient issues size: " << transientIssuesByteSize << " bytes"); - - event->DqGraphIndex = req.dq_graph_index(); - - return std::move(event); + auto request = Ev->Record; + return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); } void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev, const TActorContext& ctx) { @@ -206,12 +113,10 @@ private: Fail("ControlPlane PingTaskError", Ydb::StatusIds::GENERIC_ERROR); return; } - auto res = MakeHolder<TEvents::TEvPingTaskResponse>(); - res->Status = Ydb::StatusIds::SUCCESS; - Yq::Private::PingTaskResult result; - result.set_action(ev->Get()->Action); - res->Record.ConstructInPlace(result); - ctx.Send(Sender, res.Release()); + auto response = MakeHolder<TEvents::TEvPingTaskResponse>(); + response->Status = Ydb::StatusIds::SUCCESS; + response->Record.ConstructInPlace(ev->Get()->Record); + ctx.Send(Sender, response.Release()); Die(ctx); } diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index ec89313c029..0257143f323 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -446,112 +446,42 @@ struct TEvControlPlaneStorage { }; struct TEvPingTaskRequest : NActors::TEventLocal<TEvPingTaskRequest, EvPingTaskRequest> { - explicit TEvPingTaskRequest(const TString& tenantName, const TString& cloudId, const TString& scope, const TString& queryId, const TString& owner, const TInstant& deadline, const TString& resultId = "") - : TenantName(tenantName) - , CloudId(cloudId) - , Scope(scope) - , QueryId(queryId) - , Owner(owner) - , Deadline(deadline) - , ResultId(resultId) - { - } - size_t GetByteSize() const { - return sizeof(*this) - + TenantName.Size() - + CloudId.Size() - + Scope.Size() - + QueryId.Size() - + Owner.Size() - + ResultId.Size() - + Status.Empty() ? 0 : sizeof(*Status) - + GetIssuesByteSize(Issues) - + GetIssuesByteSize(TransientIssues) - + Statistics.Empty() ? 0 : Statistics->Size() - + ResultSetMetasByteSizeLong() - + Ast.Empty() ? 0 : Ast->Size() - + Plan.Empty() ? 0 : Plan->Size() - + StartedAt.Empty() ? 0 : sizeof(*StartedAt) - + FinishedAt.Empty() ? 0 : sizeof(*FinishedAt) - + CreatedTopicConsumersByteSizeLong() - + DqGraphByteSizeLong() - + StreamingDisposition.Empty() ? 0 : StreamingDisposition->ByteSizeLong(); - } - - size_t ResultSetMetasByteSizeLong() const { - if (ResultSetMetas.Empty()) { - return 0; - } - size_t size = 0; - for (const auto& resultSet: *ResultSetMetas) { - size += resultSet.ByteSizeLong(); - } - size += ResultSetMetas->size() * sizeof(YandexQuery::ResultSetMeta); - return size; - } + TEvPingTaskRequest() = default; - size_t CreatedTopicConsumersByteSizeLong() const { - size_t size = 0; - for (const auto& topic: CreatedTopicConsumers) { - size += topic.GetByteSize(); - } - size += CreatedTopicConsumers.size() * sizeof(YandexQuery::ResultSetMeta); - return size; - } + explicit TEvPingTaskRequest( + Yq::Private::PingTaskRequest&& request) + : Request(std::move(request)) + {} - size_t DqGraphByteSizeLong() const { - size_t size = 0; - for (const auto& graph: DqGraphs) { - size += graph.Size(); - } - size += DqGraphs.size() * sizeof(TString); - return size; + size_t GetByteSize() const { + return sizeof(*this) + + Request.ByteSizeLong(); } - const TString TenantName; - const TString CloudId; - const TString Scope; - const TString QueryId; - const TString Owner; - const TInstant Deadline; - TString ResultId; - TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status; - TMaybe<NYql::TIssues> Issues; - TMaybe<NYql::TIssues> TransientIssues; - TMaybe<TString> Statistics; - TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; - TMaybe<TString> Ast; - TMaybe<TString> Plan; - TMaybe<TInstant> StartedAt; - TMaybe<TInstant> FinishedAt; - bool ResignQuery = false; - ui64 StatusCode = 0; - TVector<TTopicConsumer> CreatedTopicConsumers; - TVector<TString> DqGraphs; - i32 DqGraphIndex = 0; - YandexQuery::StateLoadMode StateLoadMode = YandexQuery::STATE_LOAD_MODE_UNSPECIFIED; - TMaybe<YandexQuery::StreamingDisposition> StreamingDisposition; + Yq::Private::PingTaskRequest Request; }; struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> { - explicit TEvPingTaskResponse(const YandexQuery::QueryAction& action) - : Action(action) - { - } - explicit TEvPingTaskResponse(const NYql::TIssues& issues) + explicit TEvPingTaskResponse( + const Yq::Private::PingTaskResult& record) + : Record(record) + {} + + explicit TEvPingTaskResponse( + const NYql::TIssues& issues) : Issues(issues) - { - } + {} size_t GetByteSize() const { return sizeof(*this) + + Record.ByteSizeLong() + GetIssuesByteSize(Issues) + GetDebugInfoByteSize(DebugInfo); } - YandexQuery::QueryAction Action = YandexQuery::QUERY_ACTION_UNSPECIFIED; + Yq::Private::PingTaskResult Record; NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp index c1f5f6f3ee0..65d65f7bcd8 100644 --- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp @@ -307,7 +307,7 @@ private: { SendEmptyResponse< TEvControlPlaneStorage::TEvPingTaskRequest::TPtr, - YandexQuery::QueryAction, + Yq::Private::PingTaskResult, TEvControlPlaneStorage::TEvPingTaskResponse>(ev, "PingTaskRequest"); } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 5d33c45bc33..ed5ab92f636 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -18,13 +18,13 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) { } // namespace std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask( - const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response, + const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TRetryPolicyItem>& retryPolicies) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)"); - readQueryBuilder.AddString("tenant", request->TenantName); - readQueryBuilder.AddString("scope", request->Scope); - readQueryBuilder.AddString("query_id", request->QueryId); + readQueryBuilder.AddString("tenant", request.tenant()); + readQueryBuilder.AddString("scope", request.scope()); + readQueryBuilder.AddString("query_id", request.query_id().value()); readQueryBuilder.AddText( "$last_job_id = SELECT `" LAST_JOB_ID_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n" " WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" @@ -53,23 +53,23 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam { TResultSetParser parser(resultSets[0]); if (!parser.TryNextRow()) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } } { TResultSetParser parser(resultSets[1]); if (!parser.TryNextRow()) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString(); } @@ -78,11 +78,11 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam { TResultSetParser parser(resultSets[2]); if (!parser.TryNextRow()) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ; } owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); - if (owner != request->Owner) { - ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)"; + if (owner != request.owner_id()) { + ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)"; } retryLimiter.Assign( parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0), @@ -91,16 +91,28 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam ); } - TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus = request->Status; - TMaybe<NYql::TIssues> issues = request->Issues; - TMaybe<NYql::TIssues> transientIssues = request->TransientIssues; - + TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus; + if (request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) { + queryStatus = request.status(); + } + TMaybe<NYql::TIssues> issues; + if (request.issues().size() > 0) { + NYql::TIssues requestIssues; + NYql::IssuesFromMessage(request.issues(), requestIssues); + issues = requestIssues; + } + TMaybe<NYql::TIssues> transientIssues; + if (request.transient_issues().size() > 0) { + NYql::TIssues requestTransientIssues; + NYql::IssuesFromMessage(request.transient_issues(), requestTransientIssues); + transientIssues = requestTransientIssues; + } // running query us locked for lease period TDuration backoff = taskLeaseTtl; - if (request->ResignQuery) { + if (request.resign_query()) { TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero()); - auto it = retryPolicies.find(request->StatusCode); + auto it = retryPolicies.find(request.status_code()); auto policyFound = it != retryPolicies.end(); if (policyFound) { policy = it->second; @@ -125,7 +137,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } } } - CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << request->StatusCode << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff); + CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << NYql::NDqProto::StatusIds_StatusCode_Name(request.status_code()) << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff); } if (queryStatus) { @@ -150,42 +162,38 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam NYql::IssuesToMessage(newIssues, query.mutable_transient_issue()); } - if (request->Statistics) { - *query.mutable_statistics()->mutable_json() = *request->Statistics; - *job.mutable_statistics()->mutable_json() = *request->Statistics; + if (request.statistics()) { + *query.mutable_statistics()->mutable_json() = request.statistics(); + *job.mutable_statistics()->mutable_json() = request.statistics(); } - if (request->ResultSetMetas) { + if (!request.result_set_meta().empty()) { // we will overwrite result_set_meta's COMPLETELY - query.clear_result_set_meta(); - job.clear_result_set_meta(); - for (const auto& resultSetMeta : *request->ResultSetMetas) { - *query.add_result_set_meta() = resultSetMeta; - *job.add_result_set_meta() = resultSetMeta; - } + *query.mutable_result_set_meta() = request.result_set_meta(); + *job.mutable_result_set_meta() = request.result_set_meta(); } - if (request->Ast) { - query.mutable_ast()->set_data(*request->Ast); - job.mutable_ast()->set_data(*request->Ast); + if (request.ast()) { + query.mutable_ast()->set_data(request.ast()); + job.mutable_ast()->set_data(request.ast()); } - if (request->Plan) { - query.mutable_plan()->set_json(*request->Plan); - job.mutable_plan()->set_json(*request->Plan); + if (request.plan()) { + query.mutable_plan()->set_json(request.plan()); + job.mutable_plan()->set_json(request.plan()); } - if (request->StartedAt) { - *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt); - *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt); + if (request.has_started_at()) { + *query.mutable_meta()->mutable_started_at() = request.started_at(); + *job.mutable_query_meta()->mutable_started_at() = request.started_at(); } - if (request->FinishedAt) { - *query.mutable_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt); - *job.mutable_query_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt); + if (request.has_finished_at()) { + *query.mutable_meta()->mutable_finished_at() = request.finished_at(); + *job.mutable_query_meta()->mutable_finished_at() = request.finished_at(); if (!query.meta().has_started_at()) { - *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt); - *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt); + *query.mutable_meta()->mutable_started_at() = request.finished_at(); + *job.mutable_query_meta()->mutable_started_at() = request.finished_at(); } } @@ -197,43 +205,33 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) { - *query.mutable_meta()->mutable_result_expire_at() = NProtoInterop::CastToProto(request->Deadline); + *query.mutable_meta()->mutable_result_expire_at() = request.deadline(); } - if (request->StateLoadMode) { - internal.set_state_load_mode(request->StateLoadMode); - if (request->StateLoadMode == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint + if (request.state_load_mode()) { + internal.set_state_load_mode(request.state_load_mode()); + if (request.state_load_mode() == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint query.mutable_meta()->set_has_saved_checkpoints(true); } } - if (request->StreamingDisposition) { - internal.mutable_disposition()->CopyFrom(*request->StreamingDisposition); + if (request.has_disposition()) { + *internal.mutable_disposition() = request.disposition(); } - if (request->Status && IsFinishedStatus(*request->Status)) { + if (request.status() && IsFinishedStatus(request.status())) { internal.clear_created_topic_consumers(); internal.clear_dq_graph(); internal.clear_dq_graph_index(); } - if (!request->CreatedTopicConsumers.empty()) { + if (!request.created_topic_consumers().empty()) { std::set<Yq::Private::TopicConsumer, TTopicConsumerLess> mergedConsumers; for (auto&& c : *internal.mutable_created_topic_consumers()) { mergedConsumers.emplace(std::move(c)); } - - for (const auto& c : request->CreatedTopicConsumers) { - Yq::Private::TopicConsumer proto; - proto.set_database_id(c.DatabaseId); - proto.set_database(c.Database); - proto.set_topic_path(c.TopicPath); - proto.set_consumer_name(c.ConsumerName); - proto.set_cluster_endpoint(c.ClusterEndpoint); - proto.set_use_ssl(c.UseSsl); - proto.set_token_name(c.TokenName); - proto.set_add_bearer_to_token(c.AddBearerToToken); - mergedConsumers.emplace(std::move(proto)); + for (const auto& c : request.created_topic_consumers()) { + mergedConsumers.emplace(c); } internal.clear_created_topic_consumers(); for (auto&& c : mergedConsumers) { @@ -241,27 +239,24 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } } - if (!request->DqGraphs.empty()) { - internal.clear_dq_graph(); - for (const auto& g : request->DqGraphs) { - internal.add_dq_graph(g); - } + if (!request.dq_graph().empty()) { + *internal.mutable_dq_graph() = request.dq_graph(); } - if (request->DqGraphIndex) { - internal.set_dq_graph_index(request->DqGraphIndex); + if (request.dq_graph_index()) { + internal.set_dq_graph_index(request.dq_graph_index()); } TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "HardPingTask(write)"); - writeQueryBuilder.AddString("tenant", request->TenantName); - writeQueryBuilder.AddString("scope", request->Scope); + writeQueryBuilder.AddString("tenant", request.tenant()); + writeQueryBuilder.AddString("scope", request.scope()); writeQueryBuilder.AddString("job_id", jobId); writeQueryBuilder.AddString("job", job.SerializeAsString()); writeQueryBuilder.AddString("query", query.SerializeAsString()); writeQueryBuilder.AddInt64("status", query.meta().status()); writeQueryBuilder.AddString("internal", internal.SerializeAsString()); - writeQueryBuilder.AddString("result_id", request->ResultId); - writeQueryBuilder.AddString("query_id", request->QueryId); + writeQueryBuilder.AddString("result_id", request.result_id().value()); + writeQueryBuilder.AddString("query_id", request.query_id().value()); if (IsTerminalStatus(query.meta().status())) { // delete pending @@ -301,7 +296,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam TString updateResultSetsExpire; if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) { - writeQueryBuilder.AddTimestamp("result_sets_expire_at", request->Deadline); + writeQueryBuilder.AddTimestamp("result_sets_expire_at", NProtoInterop::CastFromProto(request.deadline())); updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = $result_sets_expire_at"; } else { updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = NULL"; @@ -321,7 +316,8 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); - *response = internal.action(); + response->set_action(internal.action()); + const auto writeQuery = writeQueryBuilder.Build(); return std::make_pair(writeQuery.Sql, writeQuery.Params); }; @@ -330,12 +326,12 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask( - const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response, + const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& taskLeaseTtl) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)"); - readQueryBuilder.AddString("tenant", request->TenantName); - readQueryBuilder.AddString("scope", request->Scope); - readQueryBuilder.AddString("query_id", request->QueryId); + readQueryBuilder.AddString("tenant", request.tenant()); + readQueryBuilder.AddString("scope", request.scope()); + readQueryBuilder.AddString("query_id", request.query_id().value()); readQueryBuilder.AddText( "SELECT `" INTERNAL_COLUMN_NAME "`\n" "FROM `" QUERIES_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n" @@ -354,32 +350,32 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam { TResultSetParser parser(resultSets[0]); if (!parser.TryNextRow()) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ; } if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } } { TResultSetParser parser(resultSets[1]); if (!parser.TryNextRow()) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ; } owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); - if (owner != request->Owner) { - ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)"; + if (owner != request.owner_id()) { + ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)"; } } - *response = internal.action(); + response->set_action(internal.action()); TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)"); writeQueryBuilder.AddTimestamp("now", TInstant::Now()); writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + taskLeaseTtl); - writeQueryBuilder.AddString("tenant", request->TenantName); - writeQueryBuilder.AddString("scope", request->Scope); - writeQueryBuilder.AddString("query_id", request->QueryId); + writeQueryBuilder.AddString("tenant", request.tenant()); + writeQueryBuilder.AddString("scope", request.scope()); + writeQueryBuilder.AddString("query_id", request.query_id().value()); writeQueryBuilder.AddText( "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl\n" @@ -396,19 +392,19 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev) { TInstant startTime = TInstant::Now(); - TEvControlPlaneStorage::TEvPingTaskRequest* request = ev->Get(); - const TString cloudId = request->CloudId; - const TString scope = request->Scope; - TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_PING_TASK); + Yq::Private::PingTaskRequest& request = ev->Get()->Request; + const TString cloudId = ""; + const TString scope = request.scope(); + TRequestCountersPtr requestCounters = Counters.GetScopeCounters("" /*CloudId*/, scope, RTS_PING_TASK); requestCounters->InFly->Inc(); - requestCounters->RequestBytes->Add(request->GetByteSize()); - const TString queryId = request->QueryId; - const TString owner = request->Owner; - const TInstant deadline = request->Deadline; + requestCounters->RequestBytes->Add(ev->Get()->GetByteSize()); + const TString queryId = request.query_id().value(); + const TString owner = request.owner_id(); + const TInstant deadline = NProtoInterop::CastFromProto(request.deadline()); - CPS_LOG_T("PingTaskRequest: " << request->TenantName << " " << scope << " " << queryId + CPS_LOG_T("PingTaskRequest: " << request.tenant() << " " << scope << " " << queryId << " " << owner << " " << deadline << " " - << (request->Status ? YandexQuery::QueryMeta_ComputeStatus_Name(*request->Status) : "no status")); + << (request.status() ? YandexQuery::QueryMeta_ComputeStatus_Name(request.status()) : "no status")); NYql::TIssues issues = ValidatePingTask(scope, queryId, owner, deadline, Config.ResultSetsTtl); if (issues) { @@ -419,10 +415,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq return; } - std::shared_ptr<YandexQuery::QueryAction> response = std::make_shared<YandexQuery::QueryAction>(); + std::shared_ptr<Yq::Private::PingTaskResult> response = std::make_shared<Yq::Private::PingTaskResult>(); - if (request->Status) - Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(*request->Status); + if (request.status()) + Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(request.status()); auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ? ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl, Config.RetryPolicies) : ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config.TaskLeaseTtl); @@ -432,8 +428,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo); - auto prepare = [response] { return std::make_tuple(*response); }; - auto success = SendResponseTuple<TEvControlPlaneStorage::TEvPingTaskResponse, std::tuple<YandexQuery::QueryAction>>( + auto prepare = [response] { return *response; }; + auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Yq::Private::PingTaskResult>( "PingTaskRequest", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 58100255215..c0f82ed566b 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -3,6 +3,7 @@ #include <util/datetime/base.h> #include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/public/api/protos/yq.pb.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> @@ -1171,22 +1172,61 @@ public: std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build() { - auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, QueryId, Owner, Deadline, ResultId); - request->Status = Status; - request->Issues = Issues; - request->TransientIssues = TransientIssues; - request->Statistics = Statistics; - request->ResultSetMetas = ResultSetMetas; - request->Ast = Ast; - request->Plan = Plan; - request->StartedAt = StartedAt; - request->FinishedAt = FinishedAt; - request->ResignQuery = ResignQuery; - request->StatusCode = StatusCode; - request->CreatedTopicConsumers = CreatedTopicConsumers; - request->DqGraphs = DqGraphs; - request->DqGraphIndex = DqGraphIndex; - return request; + Yq::Private::PingTaskRequest request; + request.set_owner_id(Owner); + request.mutable_query_id()->set_value(QueryId); + request.mutable_result_id()->set_value(ResultId); + if (Status) { + request.set_status(*Status); + } + request.set_status_code(StatusCode); + if (Issues) { + NYql::IssuesToMessage(*Issues, request.mutable_issues()); + } + if (TransientIssues) { + NYql::IssuesToMessage(*TransientIssues, request.mutable_transient_issues()); + } + if (Statistics) { + request.set_statistics(*Statistics); + } + if (ResultSetMetas) { + for (const auto& meta : *ResultSetMetas) { + *request.add_result_set_meta() = meta; + } + } + for (const auto& dqGraph : DqGraphs) { + request.add_dq_graph(dqGraph); + } + request.set_dq_graph_index(DqGraphIndex); + if (Ast) { + request.set_ast(*Ast); + } + if (Plan) { + request.set_plan(*Plan); + } + request.set_resign_query(ResignQuery); + for (const auto& consumer : CreatedTopicConsumers) { + auto& cons = *request.add_created_topic_consumers(); + cons.set_database_id(consumer.DatabaseId); + cons.set_database(consumer.Database); + cons.set_topic_path(consumer.TopicPath); + cons.set_consumer_name(consumer.ConsumerName); + cons.set_cluster_endpoint(consumer.ClusterEndpoint); + cons.set_use_ssl(consumer.UseSsl); + cons.set_token_name(consumer.TokenName); + cons.set_add_bearer_to_token(consumer.AddBearerToToken); + } + request.set_tenant(TenantName); + request.set_scope(Scope); + *request.mutable_deadline() = NProtoInterop::CastToProto(Deadline); + if (StartedAt) { + *request.mutable_started_at() = NProtoInterop::CastToProto(*StartedAt); + } + if (FinishedAt) { + *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt); + } + + return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); } }; diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index 8dd4257daf3..2ed5c6ee5e4 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -132,25 +132,23 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane return config; } -bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request) { - if (!request) { - return false; - } - return request->Status || - request->Issues || - request->TransientIssues || - request->Statistics || - request->ResultSetMetas || - request->Ast || - request->Plan || - request->StartedAt || - request->FinishedAt || - request->ResignQuery || - !request->CreatedTopicConsumers.empty() || - !request->DqGraphs.empty() || - request->DqGraphIndex || - request->StateLoadMode || - request->StreamingDisposition; +bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request) { + return request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED + || !request.issues().empty() + || !request.transient_issues().empty() + || !request.statistics() + || !request.result_set_meta().empty() + || request.ast() + || request.plan() + || request.has_started_at() + || request.has_finished_at() + || request.resign_query() + || !request.created_topic_consumers().empty() + || !request.dq_graph().empty() + || request.dq_graph_index() + || request.state_load_mode() + || request.has_disposition() + ; } NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items) { diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h index 9cf5dfe6561..fa721cb2bd2 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.h +++ b/ydb/core/yq/libs/control_plane_storage/util.h @@ -38,7 +38,7 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue); NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config); -bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request); +bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request); NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items); diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp index d5b49fe02f9..3f4126c4b42 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.cpp +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -43,8 +43,9 @@ private: hFunc(TEvInternalService::TEvWriteResultRequest, Handle) hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle) - hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle) hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, Handle) + hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, Handle) + hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle) ); void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) { @@ -87,13 +88,24 @@ private: } } - void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) { - /* + void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& ev) { Cookie++; Senders[Cookie] = ev->Sender; auto request = ev->Get()->Request; Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie); - */ + } + + void Handle(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) { + auto it = Senders.find(ev->Cookie); + if (it != Senders.end()) { + if (ev->Get()->Issues.Size() == 0) { + Send(it->second, new TEvInternalService::TEvPingTaskResponse(ev->Get()->Record)); + } else { + auto issues = ev->Get()->Issues; + Send(it->second, new TEvInternalService::TEvPingTaskResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues))); + } + Senders.erase(it); + } } void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) { |