diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2025-05-24 14:30:50 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-24 12:30:50 +0300 |
commit | e727aaf2a0bedfa37ddb8205a1f5d922b501eab0 (patch) | |
tree | 4c35dadc3d6a51bf0fefcc597503b93bf37a849c | |
parent | dde26ab1e55a1a7ea739958e65e6c40733c06869 (diff) | |
download | ydb-e727aaf2a0bedfa37ddb8205a1f5d922b501eab0.tar.gz |
YQ FqRun supported runtime outputs (#18768)
-rw-r--r-- | ydb/tests/tools/fqrun/fqrun.cpp | 13 | ||||
-rw-r--r-- | ydb/tests/tools/fqrun/src/common.h | 5 | ||||
-rw-r--r-- | ydb/tests/tools/fqrun/src/fq_runner.cpp | 58 | ||||
-rw-r--r-- | ydb/tests/tools/fqrun/src/fq_setup.cpp | 1 | ||||
-rw-r--r-- | ydb/tests/tools/fqrun/src/fq_setup.h | 1 | ||||
-rw-r--r-- | ydb/tests/tools/fqrun/ya.make | 4 | ||||
-rw-r--r-- | ydb/tests/tools/kqprun/runlib/utils.cpp | 21 | ||||
-rw-r--r-- | ydb/tests/tools/kqprun/runlib/utils.h | 15 |
8 files changed, 96 insertions, 22 deletions
diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index 5391d497fae..6d232d93bfb 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -112,6 +112,7 @@ private: checker(Scopes.size(), "query scopes"); checker(runnerOptions.AstOutputs.size(), "ast output files"); checker(runnerOptions.PlanOutputs.size(), "plan output files"); + checker(runnerOptions.StatsOutputs.size(), "statistics output files"); } ui64 GetNumberOfQueries() const { @@ -399,15 +400,15 @@ protected: options.AddLongOption("ast-file", "File with query ast (use '-' to write in stdout)") .RequiredArgument("file") - .Handler1([this](const NLastGetopt::TOptsParser* option) { - RunnerOptions.AstOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef()))); - }); + .EmplaceTo(&RunnerOptions.AstOutputs); options.AddLongOption("plan-file", "File with query plan (use '-' to write in stdout)") .RequiredArgument("file") - .Handler1([this](const NLastGetopt::TOptsParser* option) { - RunnerOptions.PlanOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef()))); - }); + .EmplaceTo(&RunnerOptions.PlanOutputs); + + options.AddLongOption("stats-file", "File with query statistics") + .RequiredArgument("file") + .EmplaceTo(&RunnerOptions.StatsOutputs); options.AddLongOption("canonical-output", "Make ast and plan output suitable for canonization (replace volatile data such as endpoints with stable one)") .NoArgument() diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h index 69536685721..b5c697887e9 100644 --- a/ydb/tests/tools/fqrun/src/common.h +++ b/ydb/tests/tools/fqrun/src/common.h @@ -60,8 +60,9 @@ struct TRunnerOptions { std::unordered_set<ui64> TraceOptIds; IOutputStream* ResultOutput = nullptr; - std::vector<IOutputStream*> AstOutputs; - std::vector<IOutputStream*> PlanOutputs; + std::vector<TString> AstOutputs; + std::vector<TString> PlanOutputs; + std::vector<TString> StatsOutputs; bool CanonicalOutput = false; NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson; diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp index 2276978a912..1462bb692be 100644 --- a/ydb/tests/tools/fqrun/src/fq_runner.cpp +++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp @@ -164,6 +164,16 @@ private: TFqSetup::StopTraceOpt(); }; + auto printStats = [this, queryId, astPrinter = GetAstPrinter(queryId), planPrinter = GetPlanPrinter(queryId)](const TExecutionMeta& meta, bool allowEmpty = false) mutable { + if (astPrinter) { + astPrinter->Print(meta.Ast, allowEmpty); + } + if (planPrinter) { + planPrinter->Print(meta.Plan, allowEmpty); + } + PrintStatistics(queryId, meta.Statistics); + }; + while (true) { TExecutionMeta meta; const TRequestResult status = FqSetup.DescribeQuery(QueryId, CurrentOptions, meta); @@ -172,6 +182,7 @@ private: Cerr << CerrColors.Red() << "Query transient issues updated:" << CerrColors.Default() << Endl << meta.TransientIssues.ToString() << Endl; } ExecutionMeta = meta; + printStats(ExecutionMeta); if (IsFinalStatus(ExecutionMeta.Status)) { break; @@ -185,8 +196,7 @@ private: Sleep(Options.PingPeriod); } - PrintQueryAst(queryId, ExecutionMeta.Ast); - PrintQueryPlan(queryId, ExecutionMeta.Plan); + printStats(ExecutionMeta, true); if (VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Query finished. Duration: " << TInstant::Now() - StartTime << CoutColors.Default() << Endl; } @@ -209,8 +219,13 @@ private: } } - void PrintQueryAst(size_t queryId, TString ast) const { - if (const auto output = GetValue<IOutputStream*>(queryId, Options.AstOutputs, nullptr)) { + std::optional<TCachedPrinter> GetAstPrinter(size_t queryId) const { + const auto& astOutput = GetValue<TString>(queryId, Options.AstOutputs, {}); + if (!astOutput) { + return std::nullopt; + } + + return TCachedPrinter(astOutput, [this](TString ast, IOutputStream& output) { if (VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Writing query ast" << CoutColors.Default() << Endl; } @@ -218,13 +233,17 @@ private: ast = CanonizeEndpoints(ast, Options.FqSettings.AppConfig.GetFederatedQueryConfig().GetGateways()); ast = CanonizeAstLogicalId(ast); } - output->Write(ast); - output->Flush(); - } + output.Write(ast); + }); } - void PrintQueryPlan(size_t queryId, TString plan) const { - if (const auto output = GetValue<IOutputStream*>(queryId, Options.PlanOutputs, nullptr)) { + std::optional<TCachedPrinter> GetPlanPrinter(size_t queryId) const { + const auto& planOutput = GetValue<TString>(queryId, Options.PlanOutputs, {}); + if (!planOutput) { + return std::nullopt; + } + + return TCachedPrinter(planOutput, [this](TString plan, IOutputStream& output) { if (VerboseLevel >= EVerbose::Info) { Cout << CoutColors.Cyan() << "Writing query plan" << CoutColors.Default() << Endl; } @@ -232,16 +251,27 @@ private: return; } - NJson::TJsonValue planJson; - NJson::ReadJsonTree(plan, &planJson, true); plan = NJson::PrettifyJson(plan, false); - if (Options.CanonicalOutput) { plan = CanonizeEndpoints(plan, Options.FqSettings.AppConfig.GetFederatedQueryConfig().GetGateways()); } - output->Write(plan); - output->Flush(); + output.Write(plan); + if (!Options.CanonicalOutput) { + output.Write('\n'); + } + }); + } + + void PrintStatistics(size_t queryId, const TString& statistics) const { + if (!statistics) { + return; + } + + if (const auto& statsFile = GetValue<TString>(queryId, Options.StatsOutputs, {})) { + TFileOutput output(statsFile); + output.Write(NJson::PrettifyJson(statistics, false)); + output.Finish(); } } diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index c5cac69c058..4aa130a6795 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -354,6 +354,7 @@ TRequestResult TFqSetup::DescribeQuery(const TString& queryId, const TFqOptions& meta.Ast = result.ast().data(); meta.Plan = result.plan().json(); + meta.Statistics = result.statistics().json(); return GetStatus(response->Get()->Issues); } diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h index 119d54587ee..cae1e42f4f6 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.h +++ b/ydb/tests/tools/fqrun/src/fq_setup.h @@ -9,6 +9,7 @@ namespace NFqRun { struct TExecutionMeta { TString Ast; TString Plan; + TString Statistics; FederatedQuery::QueryMeta::ComputeStatus Status; NYql::TIssues Issues; diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make index a80046acebe..04fa632d5c6 100644 --- a/ydb/tests/tools/fqrun/ya.make +++ b/ydb/tests/tools/fqrun/ya.make @@ -23,7 +23,11 @@ PEERDIR( PEERDIR( yql/essentials/udfs/common/compress_base + yql/essentials/udfs/common/datetime2 + yql/essentials/udfs/common/digest yql/essentials/udfs/common/re2 + yql/essentials/udfs/common/string + yql/essentials/udfs/common/yson2 ) YQL_LAST_ABI_VERSION() diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp index 81e649a2c84..2ef83f5da5c 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.cpp +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -182,6 +182,27 @@ TString TStatsPrinter::FormatNumber(i64 number) { return stream.str(); } +TCachedPrinter::TCachedPrinter(const TString& output, TPrinter printer) + : Output(output) + , Printer(printer) +{} + +void TCachedPrinter::Print(const TString& data, bool allowEmpty) { + if ((!data && !allowEmpty) || (PrintedData && data == *PrintedData)) { + return; + } + + IOutputStream* stream = &Cout; + if (Output != "-") { + FileOutput = std::make_unique<TFileOutput>(Output); + stream = &(*FileOutput); + } + + Printer(data, *stream); + stream->Flush(); + PrintedData = data; +} + TString LoadFile(const TString& file) { return TFileInput(file).ReadAll(); } diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h index d87bfb2bb2a..5a28ac5239b 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.h +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -96,6 +96,21 @@ private: const std::unique_ptr<NFq::IPlanStatProcessor> StatProcessor; }; +class TCachedPrinter { +public: + using TPrinter = std::function<void(const TString& data, IOutputStream& output)>; + + TCachedPrinter(const TString& output, TPrinter printer); + + void Print(const TString& data, bool allowEmpty = false); + +private: + TString Output; + TPrinter Printer; + std::unique_ptr<TFileOutput> FileOutput; + std::optional<TString> PrintedData; +}; + TString LoadFile(const TString& file); NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName); |