diff options
author | hcpp <hcpp@ydb.tech> | 2023-09-21 20:13:25 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-09-21 20:38:28 +0300 |
commit | d6df73488ac0421cf6bad11c1548dedf33776fdc (patch) | |
tree | 312646e73b694b18ad417303989b6b427f9195fd | |
parent | d678e4fe555252698db9c8a9202a549758f30f06 (diff) | |
download | ydb-d6df73488ac0421cf6bad11c1548dedf33776fdc.tar.gz |
ast compression has been fixed
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index e9e34f53862..83c4208a3c3 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -1,5 +1,6 @@ #include "base_compute_actor.h" +#include <ydb/core/fq/libs/common/compression.h> #include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/compute/common/metrics.h> #include <ydb/core/fq/libs/compute/common/retry_actor.h> @@ -76,6 +77,7 @@ public: , OperationId(operationId) , Counters(GetStepCountersSubgroup()) , BackoffTimer(20, 1000) + , Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize()) {} static constexpr char ActorName[] = "FQ_STATUS_TRACKER"; @@ -165,8 +167,7 @@ public: Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING); - pingTaskRequest.set_ast(QueryStats.query_ast()); - pingTaskRequest.set_plan(QueryStats.query_plan()); + PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } @@ -177,8 +178,7 @@ public: Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING); - pingTaskRequest.set_ast(QueryStats.query_ast()); - pingTaskRequest.set_plan(QueryStats.query_plan()); + PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); try { pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); } catch(const NJson::TJsonException& ex) { @@ -187,6 +187,21 @@ public: Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } + void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const { + if (Compressor.IsEnabled()) { + auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr); + request.mutable_ast_compressed()->set_method(astCompressionMethod); + request.mutable_ast_compressed()->set_data(astCompressed); + + auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan); + request.mutable_plan_compressed()->set_method(planCompressionMethod); + request.mutable_plan_compressed()->set_data(planCompressed); + } else { + request.set_ast(expr); + request.set_plan(plan); + } + } + private: TRunActorParams Params; TActorId Parent; @@ -200,6 +215,7 @@ private: NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; Ydb::TableStats::QueryStats QueryStats; NKikimr::TBackoffTimer BackoffTimer; + const TCompressor Compressor; }; std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params, |