diff options
author | uzhas <uzhas@ydb.tech> | 2022-07-19 11:56:40 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-07-19 11:56:40 +0300 |
commit | 188a2f4a6b4b415f13eaf72fa82fae030b81b8aa (patch) | |
tree | b569e41694cf1d049bf3e660744ca6ccf5104bad | |
parent | 5ede2b35f38901718b55de9f9517cf21405c3630 (diff) | |
download | ydb-188a2f4a6b4b415f13eaf72fa82fae030b81b8aa.tar.gz |
compress ast, plan, dq_graph
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 16 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 30 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/task_get.cpp | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/common/compression.cpp | 42 | ||||
-rw-r--r-- | ydb/core/yq/libs/common/compression.h | 27 | ||||
-rw-r--r-- | ydb/core/yq/libs/config/protos/common.proto | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/config/protos/control_plane_storage.proto | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp | 20 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto | 8 | ||||
-rw-r--r-- | ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp | 34 | ||||
-rw-r--r-- | ydb/core/yq/libs/protos/yq_private.proto | 31 |
13 files changed, 192 insertions, 24 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index bf573255870..ffbac05fccc 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -42,6 +42,7 @@ #include <ydb/public/sdk/cpp/client/ydb_value/value.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <ydb/core/yq/libs/common/compression.h> #include <ydb/core/yq/libs/common/entity_id.h> #include <ydb/core/yq/libs/events/events.h> @@ -50,7 +51,6 @@ #include <ydb/core/yq/libs/private_client/internal_service.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/protobuf/interop/cast.h> #include <ydb/library/security/util.h> @@ -297,7 +297,17 @@ private: queryCounters.InitUptimeCounter(); const auto createdAt = TInstant::Now(); - + TVector<TString> dqGraphs; + if (!task.dq_graph_compressed().empty()) { + dqGraphs.reserve(task.dq_graph_compressed().size()); + for (auto& g : task.dq_graph_compressed()) { + TCompressor compressor(g.method()); + dqGraphs.emplace_back(compressor.Decompress(g.data())); + } + } else { + // todo: remove after migration + dqGraphs = VectorFromProto(task.dq_graph()); + } TRunActorParams params( YqSharedResources, CredentialsProviderFactory, S3Gateway, FunctionRegistry, RandomProvider, @@ -320,7 +330,7 @@ private: task.status(), cloudId, VectorFromProto(task.result_set_meta()), - VectorFromProto(task.dq_graph()), + std::move(dqGraphs), task.dq_graph_index(), VectorFromProto(task.created_topic_consumers()), task.automatic(), diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index eb0f15a61ad..da62ccd1162 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -70,6 +70,7 @@ #include <ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h> #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> +#include <ydb/core/yq/libs/common/compression.h> #include <ydb/core/yq/libs/private_client/private_client.h> #define LOG_E(stream) \ @@ -273,6 +274,7 @@ public: , QueryCounters(queryCounters) , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled()) , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40) + , Compressor(Params.CommonConfig.GetQueryArtifactsCompressionMethod(), Params.CommonConfig.GetQueryArtifactsCompressionMinSize()) { QueryCounters.SetUptimePublicAndServiceCounter(0); } @@ -721,8 +723,19 @@ private: void UpdateAstAndPlan(const TString& plan, const TString& expr) { Yq::Private::PingTaskRequest request; - request.set_ast(expr); - request.set_plan(plan); + if (Compressor.IsEnabled()) { + auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr); + request.mutable_ast_compressed()->set_method(astCompressionMethod); + request.mutable_ast_compressed()->set_data(astCompressed); + + auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan); + request.mutable_plan_compressed()->set_method(planCompressionMethod); + request.mutable_plan_compressed()->set_data(planCompressed); + } else { + request.set_ast(expr); // todo: remove after migration + request.set_plan(plan); // todo: remove after migration + } + Send(Pinger, new TEvents::TEvForwardPingRequest(request)); } @@ -765,7 +778,15 @@ private: } for (const auto& graphParams : DqGraphParams) { - request.add_dq_graph(graphParams.SerializeAsString()); + const TString& serializedGraph = graphParams.SerializeAsString(); + if (Compressor.IsEnabled()) { + auto& dq_graph_compressed = *request.add_dq_graph_compressed(); + auto [method, data] = Compressor.Compress(serializedGraph); + dq_graph_compressed.set_method(method); + dq_graph_compressed.set_data(data); + } else { + request.add_dq_graph(serializedGraph); // todo: remove after migration + } } Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SaveQueryInfoCookie); @@ -1679,7 +1700,8 @@ private: bool EnableCheckpointCoordinator = false; Yq::Private::PingTaskRequest QueryStateUpdateRequest; - const ui64 MaxTasksPerOperation = 100; + const ui64 MaxTasksPerOperation; + const TCompressor Compressor; // Consumers creation TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation; diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp index ff5269b9a46..ce788677bd5 100644 --- a/ydb/core/yq/libs/actors/task_get.cpp +++ b/ydb/core/yq/libs/actors/task_get.cpp @@ -10,7 +10,6 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/protobuf/interop/cast.h> #include <ydb/core/yq/libs/common/entity_id.h> diff --git a/ydb/core/yq/libs/common/CMakeLists.txt b/ydb/core/yq/libs/common/CMakeLists.txt index 275e5dc237d..93490604959 100644 --- a/ydb/core/yq/libs/common/CMakeLists.txt +++ b/ydb/core/yq/libs/common/CMakeLists.txt @@ -14,6 +14,7 @@ target_compile_options(yq-libs-common PRIVATE target_link_libraries(yq-libs-common PUBLIC contrib-libs-cxxsupp yutil + library-cpp-blockcodecs libs-control_plane_storage-events yq-libs-events providers-common-structured_token @@ -21,6 +22,7 @@ target_link_libraries(yq-libs-common PUBLIC api-protos ) target_sources(yq-libs-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/common/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/common/entity_id.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/common/rows_proto_splitter.cpp ) diff --git a/ydb/core/yq/libs/common/compression.cpp b/ydb/core/yq/libs/common/compression.cpp new file mode 100644 index 00000000000..c8ddfe8771f --- /dev/null +++ b/ydb/core/yq/libs/common/compression.cpp @@ -0,0 +1,42 @@ +#include "compression.h" +#include <library/cpp/blockcodecs/codecs.h> + +namespace NYq { + +namespace { + const NBlockCodecs::ICodec* ProvideCodec(const TString& compressionMethod) { + auto codec = NBlockCodecs::Codec(compressionMethod); + if (!codec) { + throw yexception() << "Unable to find codec for compression method " << compressionMethod; + } + return codec; + } +} + +TCompressor::TCompressor(const TString& compressionMethod, ui64 minCompressionSize) + : CompressionMethod(compressionMethod) + , MinCompressionSize(minCompressionSize) + , Codec(compressionMethod ? ProvideCodec(compressionMethod) : nullptr) +{ +} + +bool TCompressor::IsEnabled() const { + return Codec != nullptr; +} + +std::pair<TString, TString> TCompressor::Compress(const TString& data) const { + if (!IsEnabled() || data.size() < MinCompressionSize) { + return { "", data }; + } + return { CompressionMethod, Codec->Encode(data) }; +} + +TString TCompressor::Decompress(const TString& data) const { + if (!IsEnabled()) { + return data; + } + + return Codec->Decode(data); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/common/compression.h b/ydb/core/yq/libs/common/compression.h new file mode 100644 index 00000000000..26e0aa17894 --- /dev/null +++ b/ydb/core/yq/libs/common/compression.h @@ -0,0 +1,27 @@ +#pragma once + +#include <util/generic/string.h> + +namespace NBlockCodecs { + struct ICodec; +} + +namespace NYq { + +class TCompressor { +public: + explicit TCompressor(const TString& compressionMethod, ui64 minCompressionSize = 0); + bool IsEnabled() const; + // return (compressionMethod, possiblyCompressedData) + // if compressionMethod is empty data is not compressed + std::pair<TString, TString> Compress(const TString& data) const; + TString Decompress(const TString& data) const; + +private: + const TString CompressionMethod; + const ui64 MinCompressionSize; + const NBlockCodecs::ICodec* Codec; +}; + + +} // namespace NYq diff --git a/ydb/core/yq/libs/config/protos/common.proto b/ydb/core/yq/libs/config/protos/common.proto index e86548cf3c8..6e3a3d4441b 100644 --- a/ydb/core/yq/libs/config/protos/common.proto +++ b/ydb/core/yq/libs/config/protos/common.proto @@ -21,4 +21,6 @@ message TCommonConfig { string ObjectStorageEndpoint = 6; string IdsPrefix = 7; uint64 MaxTasksPerOperation = 8; + string QueryArtifactsCompressionMethod = 9; + uint64 QueryArtifactsCompressionMinSize = 10; } diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto index 6cf23202640..2eb2b50f784 100644 --- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto @@ -21,7 +21,7 @@ message TQueryMapping { repeated string CommonTenantName = 3; } -// 1. StatusCode(s) are handled with defined policis, non-unique StatusCode(s) accross all polices is UB +// 1. StatusCode(s) are handled with defined policies, non-unique StatusCode(s) across all policies is UB // 2. RetryCount and RetryPeriodMs are used to calculate actual RetryRate, if it exceeds RetryCount, query is aborted // 3. BackoffPeriodMs is factor of RetryRate to delay query execution before next retry // 4. There are no default retry policy, all unhandled statuses are fatal diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index b9e845331c1..6fc5800b1fe 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -375,6 +375,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ *newTask->mutable_dq_graph() = task.Internal.dq_graph(); newTask->set_dq_graph_index(task.Internal.dq_graph_index()); + *newTask->mutable_dq_graph_compressed() = task.Internal.dq_graph_compressed(); *newTask->mutable_result_set_meta() = task.Query.result_set_meta(); newTask->set_scope(task.Scope); diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index c7f48b166aa..09f4d25bb46 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -183,6 +183,20 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam job.mutable_plan()->set_json(request.plan()); } + if (request.ast_compressed().data()) { + internal.mutable_ast_compressed()->set_method(request.ast_compressed().method()); + internal.mutable_ast_compressed()->set_data(request.ast_compressed().data()); + // todo: keep AST compressed in JobInternal + // job.mutable_ast()->set_data(request.ast()); + } + + if (request.plan_compressed().data()) { + internal.mutable_plan_compressed()->set_method(request.plan_compressed().method()); + internal.mutable_plan_compressed()->set_data(request.plan_compressed().data()); + // todo: keep plan compressed in JobInternal + // job.mutable_plan()->set_json(request.plan()); + } + if (request.has_started_at()) { *query.mutable_meta()->mutable_started_at() = request.started_at(); *job.mutable_query_meta()->mutable_started_at() = request.started_at(); @@ -221,7 +235,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam if (request.status() && IsFinishedStatus(request.status())) { internal.clear_created_topic_consumers(); - internal.clear_dq_graph(); + // internal.clear_dq_graph(); keep for debug internal.clear_dq_graph_index(); } @@ -243,6 +257,10 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam *internal.mutable_dq_graph() = request.dq_graph(); } + if (!request.dq_graph_compressed().empty()) { + *internal.mutable_dq_graph_compressed() = request.dq_graph_compressed(); + } + if (request.dq_graph_index()) { internal.set_dq_graph_index(request.dq_graph_index()); } diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto index 94631d3d2d4..0d17e1fc9a2 100644 --- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto @@ -29,14 +29,20 @@ message QueryInternal { StateLoadMode state_load_mode = 10; string cloud_id = 11; repeated Yq.Private.TopicConsumer created_topic_consumers = 12; - repeated bytes dq_graph = 13; + repeated bytes dq_graph = 13; // deprecated: use dq_graph_compressed int32 dq_graph_index = 14; StreamingDisposition disposition = 15; uint64 result_limit = 16; google.protobuf.Duration execution_ttl = 17; + Yq.Private.CompressedData ast_compressed = 18; + Yq.Private.CompressedData plan_compressed = 19; + repeated Yq.Private.CompressedData dq_graph_compressed = 20; } message JobInternal { + Yq.Private.CompressedData ast_compressed = 1; + Yq.Private.CompressedData plan_compressed = 2; + repeated Yq.Private.CompressedData dq_graph_compressed = 3; } message ConnectionInternal { diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index c579176c984..97ef8b866e2 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -6,6 +6,7 @@ #include <util/generic/yexception.h> #include <util/string/join.h> +#include <ydb/core/yq/libs/common/compression.h> #include <ydb/core/yq/libs/common/entity_id.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <ydb/core/yq/libs/control_plane_storage/schema.h> @@ -20,6 +21,8 @@ namespace { +constexpr ui64 GRPC_MESSAGE_SIZE_LIMIT = 64000000; + YandexQuery::IamAuth::IdentityCase GetIamAuth(const YandexQuery::Connection& connection) { const auto& setting = connection.content().setting(); switch (setting.connection_case()) { @@ -539,7 +542,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue queryBuilder.AddString("query_id", queryId); queryBuilder.AddTimestamp("now", TInstant::Now()); queryBuilder.AddText( - "SELECT `" QUERY_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n" + "SELECT `" QUERY_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n" "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND (`" EXPIRE_AT_COLUMN_NAME "` is NULL OR `" EXPIRE_AT_COLUMN_NAME "` > $now);" ); const auto query = queryBuilder.Build(); @@ -570,10 +573,35 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue ythrow TControlPlaneStorageException(TIssuesIds::ACCESS_DENIED) << "Query does not exist or permission denied. Please check the id of the query or your access rights"; } + YandexQuery::Internal::QueryInternal internal; + if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; + } + + // decompress plan + if (internal.plan_compressed().data()) { // todo: remove this if after migration + TCompressor compressor(internal.plan_compressed().method()); + result.mutable_query()->mutable_plan()->set_json(compressor.Decompress(internal.plan_compressed().data())); + } if (!permissions.Check(TPermissions::VIEW_AST)) { result.mutable_query()->clear_ast(); + } else { + // decompress AST + if (internal.ast_compressed().data()) { // todo: remove this if after migration + TCompressor compressor(internal.ast_compressed().method()); + result.mutable_query()->mutable_ast()->set_data(compressor.Decompress(internal.ast_compressed().data())); + } + if (result.query().ByteSizeLong() > GRPC_MESSAGE_SIZE_LIMIT) { + if (result.query().ast().data().size() > 1000) { + // modifing AST this way should definitely reduce query msg size + result.mutable_query()->mutable_ast()->set_data(TStringBuilder() << "Message is too big: " << result.query().ByteSizeLong() << " bytes, dropping AST of size " << result.query().ast().data().size() << " bytes"); + } + } } + if (result.query().ByteSizeLong() > GRPC_MESSAGE_SIZE_LIMIT) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Resulting query of size " << result.query().ByteSizeLong() << " bytes is too big"; + } return result; }; @@ -900,6 +928,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery query.mutable_meta()->set_started_by(user); query.mutable_meta()->clear_action(); + internal.clear_plan_compressed(); + internal.clear_ast_compressed(); + internal.clear_dq_graph_compressed(); + auto& jobMeta = *job.mutable_meta(); jobMeta.set_id(jobId); jobMeta.set_created_by(user); diff --git a/ydb/core/yq/libs/protos/yq_private.proto b/ydb/core/yq/libs/protos/yq_private.proto index b4d67c4a789..fdcfd1944bb 100644 --- a/ydb/core/yq/libs/protos/yq_private.proto +++ b/ydb/core/yq/libs/protos/yq_private.proto @@ -17,6 +17,11 @@ import "google/protobuf/duration.proto"; //////////////////////////////////////////////////////////// +message CompressedData { + string method = 1; + bytes data = 2; +} + message GetTaskRequest { string tenant = 1; string owner_id = 2; // guid, should be refreshed on node restart @@ -47,14 +52,13 @@ message TopicConsumer { message GetTaskResult { message Task { - // come back later in 10 sec ? SignedIdentity result_id = 1; SignedIdentity query_id = 2; SignedIdentity job_id = 3; uint64 generation = 4; bool streaming = 5; - repeated bytes dq_graph = 6; + repeated bytes dq_graph = 6; // deprecated: use dq_graph_compressed // text, connection and binding are empty if dq_graph is not empty string text = 7; repeated YandexQuery.Connection connection = 8; @@ -77,11 +81,11 @@ message GetTaskResult { google.protobuf.Timestamp deadline = 24; YandexQuery.StreamingDisposition disposition = 25; uint64 result_limit = 26; - YandexQuery.Limits limits = 27; - string rate_limiter = 28; // Kesus path // If empty, rate limiting is off. + string rate_limiter = 28; // Kesus path. If empty, rate limiting is off. google.protobuf.Duration execution_limit = 29; google.protobuf.Timestamp request_started_at = 30; + repeated CompressedData dq_graph_compressed = 31; } repeated Task tasks = 1; } @@ -102,28 +106,31 @@ message PingTaskRequest { SignedIdentity job_id = 3; SignedIdentity result_id = 4; YandexQuery.QueryMeta.ComputeStatus status = 5; - NYql.NDqProto.StatusIds.StatusCode status_code = 21; repeated Ydb.Issue.IssueMessage issues = 6; - repeated Ydb.Issue.IssueMessage transient_issues = 16; uint32 result_set_count = 7; string statistics = 8; repeated YandexQuery.ResultSetMeta result_set_meta = 9; string executer_info = 10; - repeated bytes dq_graph = 11; - int32 dq_graph_index = 20; - string ast = 12; - string plan = 13; + repeated bytes dq_graph = 11; // deprecated: use dq_graph_compressed + string ast = 12; // deprecated: use ast_compressed + string plan = 13; // deprecated: use plan_compressed bool resign_query = 14; + Ydb.Operations.OperationParams operation_params = 15; + repeated Ydb.Issue.IssueMessage transient_issues = 16; repeated TopicConsumer created_topic_consumers = 17; YandexQuery.StateLoadMode state_load_mode = 18; YandexQuery.StreamingDisposition disposition = 19; - Ydb.Operations.OperationParams operation_params = 15; + int32 dq_graph_index = 20; + NYql.NDqProto.StatusIds.StatusCode status_code = 21; RateLimiterResources rate_limiter_resources = 22; + repeated CompressedData dq_graph_compressed = 23; + CompressedData ast_compressed = 24; + CompressedData plan_compressed = 25; string scope = 100; - string tenant = 104; google.protobuf.Timestamp started_at = 101; google.protobuf.Timestamp finished_at = 102; google.protobuf.Timestamp deadline = 103; + string tenant = 104; } message PingTaskResult { |