diff options
author | Filitov Mikhail <filitovme@gmail.com> | 2025-03-31 11:53:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-31 11:53:03 +0200 |
commit | c47a3afd15200cc3a6e0ecbf3b82d08fbdd9d230 (patch) | |
tree | b5421f667de8c7b0e62c00ea71867af007f6b649 | |
parent | 8a2e0242d48ad89ed77da5f5528c5018dd5babaf (diff) | |
download | ydb-c47a3afd15200cc3a6e0ecbf3b82d08fbdd9d230.tar.gz |
Change logger in comp nodes (#16368)
-rw-r--r-- | ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp | 102 | ||||
-rw-r--r-- | ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp (renamed from ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp) | 0 | ||||
-rw-r--r-- | ydb/core/kqp/ut/runtime/ya.make (renamed from ydb/core/kqp/ut/spilling/ya.make) | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/ya.make | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 18 |
5 files changed, 114 insertions, 9 deletions
diff --git a/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp b/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp new file mode 100644 index 00000000000..f9478e2a336 --- /dev/null +++ b/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp @@ -0,0 +1,102 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/counters/kqp_counters.h> + +#include <util/system/fs.h> + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace { + +TKikimrSettings AppSettings(TStringStream& logStream) { + NKikimrConfig::TAppConfig appCfg; + + TKikimrSettings serverSettings; + serverSettings.SetAppConfig(appCfg); + serverSettings.LogStream = &logStream; + + return serverSettings; +} + +void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) { + for (ui32 i = 0; i < numRows; ++i) { + auto result = db.ExecuteQuery(Sprintf(R"( + --!syntax_v1 + REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s") + )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } +} + +void RunTestForQuery(const std::string& query, const std::string& expectedLog) { + TStringStream logsStream; + + Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl; + TKikimrRunner kikimr(AppSettings(logsStream)); + + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_TASKS_RUNNER, NActors::NLog::PRI_DEBUG); + + auto db = kikimr.GetQueryClient(); + + FillTableWithData(db); + + auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain); + auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync(); + Cerr << planres.GetIssues().ToString() << Endl; + UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString()); + + Cerr << planres.GetStats()->GetAst() << Endl; + + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + TString output = FormatResultSetYson(result.GetResultSet(0)); + Cout << output << Endl; + + bool hasExpectedLog = false; + TString line; + while (logsStream.ReadLine(line)) { + if (line.Contains(expectedLog)) { + hasExpectedLog = true; + break; + } + } + // TODO: Uncomment after: https://github.com/ydb-platform/ydb/issues/15597 + Y_UNUSED(hasExpectedLog); + // UNIT_ASSERT(hasExpectedLog); +} + +} // anonymous namespace + +Y_UNIT_TEST_SUITE(KqpScanLogs) { + +Y_UNIT_TEST(WideCombine) { + auto query = R"( + --!syntax_v1 + select count(t.Key) from `/Root/KeyValue` as t group by t.Value + )"; + + RunTestForQuery(query, "[WideCombine]"); +} + +Y_UNIT_TEST(GraceJoin) { + auto query = R"( + --!syntax_v1 + PRAGMA ydb.CostBasedOptimizationLevel='0'; + PRAGMA ydb.HashJoinMode='graceandself'; + select t1.Key, t1.Value, t2.Key, t2.Value + from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value + order by t1.Value + )"; + + RunTestForQuery(query, "[GraceJoin]"); +} + + +} // suite + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp b/ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp index 1f255ce8b1c..1f255ce8b1c 100644 --- a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp +++ b/ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp diff --git a/ydb/core/kqp/ut/spilling/ya.make b/ydb/core/kqp/ut/runtime/ya.make index d5642cc5756..5f15bda6690 100644 --- a/ydb/core/kqp/ut/spilling/ya.make +++ b/ydb/core/kqp/ut/runtime/ya.make @@ -6,6 +6,7 @@ SIZE(MEDIUM) SRCS( kqp_scan_spilling_ut.cpp + kqp_scan_logging_ut.cpp ) PEERDIR( diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make index e8b3dbe256f..38757b3dca1 100644 --- a/ydb/core/kqp/ut/ya.make +++ b/ydb/core/kqp/ut/ya.make @@ -16,7 +16,7 @@ RECURSE_FOR_TESTS( scan scheme service - spilling + runtime sysview tx view diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 2d96335db69..afeeb211e19 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -259,6 +259,14 @@ public: AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc); } + NUdf::TLogProviderFunc logProviderFunc = nullptr; + if (LogFunc) { + logProviderFunc = [log=LogFunc](const NUdf::TStringRef& component, NUdf::ELogLevel level, const NUdf::TStringRef& message) { + log(TStringBuilder() << "[" << component << "][" << level << "]: " << message << "\n"); + }; + } + + ComputationLogProvider = NUdf::MakeLogProvider(std::move(logProviderFunc), NUdf::ELogLevel::Debug); } ~TDqTaskRunner() { @@ -318,7 +326,7 @@ public: TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory, Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, optLLVM, EGraphPerProcess::Multi, - AllocatedHolder->ProgramParsed.StatsRegistry.Get(), CollectFull() ? &CountersProvider : nullptr); + AllocatedHolder->ProgramParsed.StatsRegistry.Get(), CollectFull() ? &CountersProvider : nullptr, nullptr, ComputationLogProvider.Get()); if (!SecureParamsProvider) { SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams); @@ -716,13 +724,6 @@ public: } auto prepareTime = TInstant::Now() - startTime; - if (LogFunc) { - TLogFunc logger = [taskId = TaskId, log = LogFunc](const TString& message) { - log(TStringBuilder() << "Run task: " << taskId << ", " << message); - }; - LogFunc = logger; - - } LOG(TStringBuilder() << "Prepare task: " << TaskId << ", takes " << prepareTime.MicroSeconds() << " us"); if (Stats) { @@ -985,6 +986,7 @@ private: private: std::shared_ptr<ISpillerFactory> SpillerFactory; TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters; + NUdf::TUniquePtr<NUdf::ILogProvider> ComputationLogProvider; ui64 TaskId = 0; TDqTaskRunnerContext Context; |