aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-09-21 20:13:25 +0300
committerhcpp <hcpp@ydb.tech>2023-09-21 20:38:28 +0300
commitd6df73488ac0421cf6bad11c1548dedf33776fdc (patch)
tree312646e73b694b18ad417303989b6b427f9195fd
parentd678e4fe555252698db9c8a9202a549758f30f06 (diff)
downloadydb-d6df73488ac0421cf6bad11c1548dedf33776fdc.tar.gz
ast compression has been fixed
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp24
1 files changed, 20 insertions, 4 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index e9e34f53862..83c4208a3c3 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -1,5 +1,6 @@
#include "base_compute_actor.h"
+#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/compute/common/metrics.h>
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
@@ -76,6 +77,7 @@ public:
, OperationId(operationId)
, Counters(GetStepCountersSubgroup())
, BackoffTimer(20, 1000)
+ , Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize())
{}
static constexpr char ActorName[] = "FQ_STATUS_TRACKER";
@@ -165,8 +167,7 @@ public:
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING);
- pingTaskRequest.set_ast(QueryStats.query_ast());
- pingTaskRequest.set_plan(QueryStats.query_plan());
+ PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
@@ -177,8 +178,7 @@ public:
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING);
- pingTaskRequest.set_ast(QueryStats.query_ast());
- pingTaskRequest.set_plan(QueryStats.query_plan());
+ PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
} catch(const NJson::TJsonException& ex) {
@@ -187,6 +187,21 @@ public:
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
+ void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
+ if (Compressor.IsEnabled()) {
+ auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
+ request.mutable_ast_compressed()->set_method(astCompressionMethod);
+ request.mutable_ast_compressed()->set_data(astCompressed);
+
+ auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
+ request.mutable_plan_compressed()->set_method(planCompressionMethod);
+ request.mutable_plan_compressed()->set_data(planCompressed);
+ } else {
+ request.set_ast(expr);
+ request.set_plan(plan);
+ }
+ }
+
private:
TRunActorParams Params;
TActorId Parent;
@@ -200,6 +215,7 @@ private:
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
Ydb::TableStats::QueryStats QueryStats;
NKikimr::TBackoffTimer BackoffTimer;
+ const TCompressor Compressor;
};
std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params,