aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2025-05-24 14:30:50 +0500
committerGitHub <noreply@github.com>2025-05-24 12:30:50 +0300
commite727aaf2a0bedfa37ddb8205a1f5d922b501eab0 (patch)
tree4c35dadc3d6a51bf0fefcc597503b93bf37a849c
parentdde26ab1e55a1a7ea739958e65e6c40733c06869 (diff)
downloadydb-e727aaf2a0bedfa37ddb8205a1f5d922b501eab0.tar.gz
YQ FqRun supported runtime outputs (#18768)
-rw-r--r--ydb/tests/tools/fqrun/fqrun.cpp13
-rw-r--r--ydb/tests/tools/fqrun/src/common.h5
-rw-r--r--ydb/tests/tools/fqrun/src/fq_runner.cpp58
-rw-r--r--ydb/tests/tools/fqrun/src/fq_setup.cpp1
-rw-r--r--ydb/tests/tools/fqrun/src/fq_setup.h1
-rw-r--r--ydb/tests/tools/fqrun/ya.make4
-rw-r--r--ydb/tests/tools/kqprun/runlib/utils.cpp21
-rw-r--r--ydb/tests/tools/kqprun/runlib/utils.h15
8 files changed, 96 insertions, 22 deletions
diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp
index 5391d497fae..6d232d93bfb 100644
--- a/ydb/tests/tools/fqrun/fqrun.cpp
+++ b/ydb/tests/tools/fqrun/fqrun.cpp
@@ -112,6 +112,7 @@ private:
checker(Scopes.size(), "query scopes");
checker(runnerOptions.AstOutputs.size(), "ast output files");
checker(runnerOptions.PlanOutputs.size(), "plan output files");
+ checker(runnerOptions.StatsOutputs.size(), "statistics output files");
}
ui64 GetNumberOfQueries() const {
@@ -399,15 +400,15 @@ protected:
options.AddLongOption("ast-file", "File with query ast (use '-' to write in stdout)")
.RequiredArgument("file")
- .Handler1([this](const NLastGetopt::TOptsParser* option) {
- RunnerOptions.AstOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef())));
- });
+ .EmplaceTo(&RunnerOptions.AstOutputs);
options.AddLongOption("plan-file", "File with query plan (use '-' to write in stdout)")
.RequiredArgument("file")
- .Handler1([this](const NLastGetopt::TOptsParser* option) {
- RunnerOptions.PlanOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef())));
- });
+ .EmplaceTo(&RunnerOptions.PlanOutputs);
+
+ options.AddLongOption("stats-file", "File with query statistics")
+ .RequiredArgument("file")
+ .EmplaceTo(&RunnerOptions.StatsOutputs);
options.AddLongOption("canonical-output", "Make ast and plan output suitable for canonization (replace volatile data such as endpoints with stable one)")
.NoArgument()
diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h
index 69536685721..b5c697887e9 100644
--- a/ydb/tests/tools/fqrun/src/common.h
+++ b/ydb/tests/tools/fqrun/src/common.h
@@ -60,8 +60,9 @@ struct TRunnerOptions {
std::unordered_set<ui64> TraceOptIds;
IOutputStream* ResultOutput = nullptr;
- std::vector<IOutputStream*> AstOutputs;
- std::vector<IOutputStream*> PlanOutputs;
+ std::vector<TString> AstOutputs;
+ std::vector<TString> PlanOutputs;
+ std::vector<TString> StatsOutputs;
bool CanonicalOutput = false;
NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson;
diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp
index 2276978a912..1462bb692be 100644
--- a/ydb/tests/tools/fqrun/src/fq_runner.cpp
+++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp
@@ -164,6 +164,16 @@ private:
TFqSetup::StopTraceOpt();
};
+ auto printStats = [this, queryId, astPrinter = GetAstPrinter(queryId), planPrinter = GetPlanPrinter(queryId)](const TExecutionMeta& meta, bool allowEmpty = false) mutable {
+ if (astPrinter) {
+ astPrinter->Print(meta.Ast, allowEmpty);
+ }
+ if (planPrinter) {
+ planPrinter->Print(meta.Plan, allowEmpty);
+ }
+ PrintStatistics(queryId, meta.Statistics);
+ };
+
while (true) {
TExecutionMeta meta;
const TRequestResult status = FqSetup.DescribeQuery(QueryId, CurrentOptions, meta);
@@ -172,6 +182,7 @@ private:
Cerr << CerrColors.Red() << "Query transient issues updated:" << CerrColors.Default() << Endl << meta.TransientIssues.ToString() << Endl;
}
ExecutionMeta = meta;
+ printStats(ExecutionMeta);
if (IsFinalStatus(ExecutionMeta.Status)) {
break;
@@ -185,8 +196,7 @@ private:
Sleep(Options.PingPeriod);
}
- PrintQueryAst(queryId, ExecutionMeta.Ast);
- PrintQueryPlan(queryId, ExecutionMeta.Plan);
+ printStats(ExecutionMeta, true);
if (VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Query finished. Duration: " << TInstant::Now() - StartTime << CoutColors.Default() << Endl;
}
@@ -209,8 +219,13 @@ private:
}
}
- void PrintQueryAst(size_t queryId, TString ast) const {
- if (const auto output = GetValue<IOutputStream*>(queryId, Options.AstOutputs, nullptr)) {
+ std::optional<TCachedPrinter> GetAstPrinter(size_t queryId) const {
+ const auto& astOutput = GetValue<TString>(queryId, Options.AstOutputs, {});
+ if (!astOutput) {
+ return std::nullopt;
+ }
+
+ return TCachedPrinter(astOutput, [this](TString ast, IOutputStream& output) {
if (VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Writing query ast" << CoutColors.Default() << Endl;
}
@@ -218,13 +233,17 @@ private:
ast = CanonizeEndpoints(ast, Options.FqSettings.AppConfig.GetFederatedQueryConfig().GetGateways());
ast = CanonizeAstLogicalId(ast);
}
- output->Write(ast);
- output->Flush();
- }
+ output.Write(ast);
+ });
}
- void PrintQueryPlan(size_t queryId, TString plan) const {
- if (const auto output = GetValue<IOutputStream*>(queryId, Options.PlanOutputs, nullptr)) {
+ std::optional<TCachedPrinter> GetPlanPrinter(size_t queryId) const {
+ const auto& planOutput = GetValue<TString>(queryId, Options.PlanOutputs, {});
+ if (!planOutput) {
+ return std::nullopt;
+ }
+
+ return TCachedPrinter(planOutput, [this](TString plan, IOutputStream& output) {
if (VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Writing query plan" << CoutColors.Default() << Endl;
}
@@ -232,16 +251,27 @@ private:
return;
}
- NJson::TJsonValue planJson;
- NJson::ReadJsonTree(plan, &planJson, true);
plan = NJson::PrettifyJson(plan, false);
-
if (Options.CanonicalOutput) {
plan = CanonizeEndpoints(plan, Options.FqSettings.AppConfig.GetFederatedQueryConfig().GetGateways());
}
- output->Write(plan);
- output->Flush();
+ output.Write(plan);
+ if (!Options.CanonicalOutput) {
+ output.Write('\n');
+ }
+ });
+ }
+
+ void PrintStatistics(size_t queryId, const TString& statistics) const {
+ if (!statistics) {
+ return;
+ }
+
+ if (const auto& statsFile = GetValue<TString>(queryId, Options.StatsOutputs, {})) {
+ TFileOutput output(statsFile);
+ output.Write(NJson::PrettifyJson(statistics, false));
+ output.Finish();
}
}
diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp
index c5cac69c058..4aa130a6795 100644
--- a/ydb/tests/tools/fqrun/src/fq_setup.cpp
+++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp
@@ -354,6 +354,7 @@ TRequestResult TFqSetup::DescribeQuery(const TString& queryId, const TFqOptions&
meta.Ast = result.ast().data();
meta.Plan = result.plan().json();
+ meta.Statistics = result.statistics().json();
return GetStatus(response->Get()->Issues);
}
diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h
index 119d54587ee..cae1e42f4f6 100644
--- a/ydb/tests/tools/fqrun/src/fq_setup.h
+++ b/ydb/tests/tools/fqrun/src/fq_setup.h
@@ -9,6 +9,7 @@ namespace NFqRun {
struct TExecutionMeta {
TString Ast;
TString Plan;
+ TString Statistics;
FederatedQuery::QueryMeta::ComputeStatus Status;
NYql::TIssues Issues;
diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make
index a80046acebe..04fa632d5c6 100644
--- a/ydb/tests/tools/fqrun/ya.make
+++ b/ydb/tests/tools/fqrun/ya.make
@@ -23,7 +23,11 @@ PEERDIR(
PEERDIR(
yql/essentials/udfs/common/compress_base
+ yql/essentials/udfs/common/datetime2
+ yql/essentials/udfs/common/digest
yql/essentials/udfs/common/re2
+ yql/essentials/udfs/common/string
+ yql/essentials/udfs/common/yson2
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp
index 81e649a2c84..2ef83f5da5c 100644
--- a/ydb/tests/tools/kqprun/runlib/utils.cpp
+++ b/ydb/tests/tools/kqprun/runlib/utils.cpp
@@ -182,6 +182,27 @@ TString TStatsPrinter::FormatNumber(i64 number) {
return stream.str();
}
+TCachedPrinter::TCachedPrinter(const TString& output, TPrinter printer)
+ : Output(output)
+ , Printer(printer)
+{}
+
+void TCachedPrinter::Print(const TString& data, bool allowEmpty) {
+ if ((!data && !allowEmpty) || (PrintedData && data == *PrintedData)) {
+ return;
+ }
+
+ IOutputStream* stream = &Cout;
+ if (Output != "-") {
+ FileOutput = std::make_unique<TFileOutput>(Output);
+ stream = &(*FileOutput);
+ }
+
+ Printer(data, *stream);
+ stream->Flush();
+ PrintedData = data;
+}
+
TString LoadFile(const TString& file) {
return TFileInput(file).ReadAll();
}
diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h
index d87bfb2bb2a..5a28ac5239b 100644
--- a/ydb/tests/tools/kqprun/runlib/utils.h
+++ b/ydb/tests/tools/kqprun/runlib/utils.h
@@ -96,6 +96,21 @@ private:
const std::unique_ptr<NFq::IPlanStatProcessor> StatProcessor;
};
+class TCachedPrinter {
+public:
+ using TPrinter = std::function<void(const TString& data, IOutputStream& output)>;
+
+ TCachedPrinter(const TString& output, TPrinter printer);
+
+ void Print(const TString& data, bool allowEmpty = false);
+
+private:
+ TString Output;
+ TPrinter Printer;
+ std::unique_ptr<TFileOutput> FileOutput;
+ std::optional<TString> PrintedData;
+};
+
TString LoadFile(const TString& file);
NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName);