aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <filitovme@gmail.com>2025-03-31 11:53:03 +0200
committerGitHub <noreply@github.com>2025-03-31 11:53:03 +0200
commitc47a3afd15200cc3a6e0ecbf3b82d08fbdd9d230 (patch)
treeb5421f667de8c7b0e62c00ea71867af007f6b649
parent8a2e0242d48ad89ed77da5f5528c5018dd5babaf (diff)
downloadydb-c47a3afd15200cc3a6e0ecbf3b82d08fbdd9d230.tar.gz
Change logger in comp nodes (#16368)
-rw-r--r--ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp102
-rw-r--r--ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp (renamed from ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp)0
-rw-r--r--ydb/core/kqp/ut/runtime/ya.make (renamed from ydb/core/kqp/ut/spilling/ya.make)1
-rw-r--r--ydb/core/kqp/ut/ya.make2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp18
5 files changed, 114 insertions, 9 deletions
diff --git a/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp b/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp
new file mode 100644
index 00000000000..f9478e2a336
--- /dev/null
+++ b/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp
@@ -0,0 +1,102 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
+#include <util/system/fs.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+namespace {
+
+TKikimrSettings AppSettings(TStringStream& logStream) {
+ NKikimrConfig::TAppConfig appCfg;
+
+ TKikimrSettings serverSettings;
+ serverSettings.SetAppConfig(appCfg);
+ serverSettings.LogStream = &logStream;
+
+ return serverSettings;
+}
+
+void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) {
+ for (ui32 i = 0; i < numRows; ++i) {
+ auto result = db.ExecuteQuery(Sprintf(R"(
+ --!syntax_v1
+ REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
+ )", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+}
+
+void RunTestForQuery(const std::string& query, const std::string& expectedLog) {
+ TStringStream logsStream;
+
+ Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
+ TKikimrRunner kikimr(AppSettings(logsStream));
+
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_TASKS_RUNNER, NActors::NLog::PRI_DEBUG);
+
+ auto db = kikimr.GetQueryClient();
+
+ FillTableWithData(db);
+
+ auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
+ auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
+ Cerr << planres.GetIssues().ToString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
+
+ Cerr << planres.GetStats()->GetAst() << Endl;
+
+ auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ TString output = FormatResultSetYson(result.GetResultSet(0));
+ Cout << output << Endl;
+
+ bool hasExpectedLog = false;
+ TString line;
+ while (logsStream.ReadLine(line)) {
+ if (line.Contains(expectedLog)) {
+ hasExpectedLog = true;
+ break;
+ }
+ }
+ // TODO: Uncomment after: https://github.com/ydb-platform/ydb/issues/15597
+ Y_UNUSED(hasExpectedLog);
+ // UNIT_ASSERT(hasExpectedLog);
+}
+
+} // anonymous namespace
+
+Y_UNIT_TEST_SUITE(KqpScanLogs) {
+
+Y_UNIT_TEST(WideCombine) {
+ auto query = R"(
+ --!syntax_v1
+ select count(t.Key) from `/Root/KeyValue` as t group by t.Value
+ )";
+
+ RunTestForQuery(query, "[WideCombine]");
+}
+
+Y_UNIT_TEST(GraceJoin) {
+ auto query = R"(
+ --!syntax_v1
+ PRAGMA ydb.CostBasedOptimizationLevel='0';
+ PRAGMA ydb.HashJoinMode='graceandself';
+ select t1.Key, t1.Value, t2.Key, t2.Value
+ from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
+ order by t1.Value
+ )";
+
+ RunTestForQuery(query, "[GraceJoin]");
+}
+
+
+} // suite
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp b/ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp
index 1f255ce8b1c..1f255ce8b1c 100644
--- a/ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
+++ b/ydb/core/kqp/ut/runtime/kqp_scan_spilling_ut.cpp
diff --git a/ydb/core/kqp/ut/spilling/ya.make b/ydb/core/kqp/ut/runtime/ya.make
index d5642cc5756..5f15bda6690 100644
--- a/ydb/core/kqp/ut/spilling/ya.make
+++ b/ydb/core/kqp/ut/runtime/ya.make
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
SRCS(
kqp_scan_spilling_ut.cpp
+ kqp_scan_logging_ut.cpp
)
PEERDIR(
diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make
index e8b3dbe256f..38757b3dca1 100644
--- a/ydb/core/kqp/ut/ya.make
+++ b/ydb/core/kqp/ut/ya.make
@@ -16,7 +16,7 @@ RECURSE_FOR_TESTS(
scan
scheme
service
- spilling
+ runtime
sysview
tx
view
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 2d96335db69..afeeb211e19 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -259,6 +259,14 @@ public:
AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc);
}
+ NUdf::TLogProviderFunc logProviderFunc = nullptr;
+ if (LogFunc) {
+ logProviderFunc = [log=LogFunc](const NUdf::TStringRef& component, NUdf::ELogLevel level, const NUdf::TStringRef& message) {
+ log(TStringBuilder() << "[" << component << "][" << level << "]: " << message << "\n");
+ };
+ }
+
+ ComputationLogProvider = NUdf::MakeLogProvider(std::move(logProviderFunc), NUdf::ELogLevel::Debug);
}
~TDqTaskRunner() {
@@ -318,7 +326,7 @@ public:
TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory,
Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, optLLVM, EGraphPerProcess::Multi,
- AllocatedHolder->ProgramParsed.StatsRegistry.Get(), CollectFull() ? &CountersProvider : nullptr);
+ AllocatedHolder->ProgramParsed.StatsRegistry.Get(), CollectFull() ? &CountersProvider : nullptr, nullptr, ComputationLogProvider.Get());
if (!SecureParamsProvider) {
SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
@@ -716,13 +724,6 @@ public:
}
auto prepareTime = TInstant::Now() - startTime;
- if (LogFunc) {
- TLogFunc logger = [taskId = TaskId, log = LogFunc](const TString& message) {
- log(TStringBuilder() << "Run task: " << taskId << ", " << message);
- };
- LogFunc = logger;
-
- }
LOG(TStringBuilder() << "Prepare task: " << TaskId << ", takes " << prepareTime.MicroSeconds() << " us");
if (Stats) {
@@ -985,6 +986,7 @@ private:
private:
std::shared_ptr<ISpillerFactory> SpillerFactory;
TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters;
+ NUdf::TUniquePtr<NUdf::ILogProvider> ComputationLogProvider;
ui64 TaskId = 0;
TDqTaskRunnerContext Context;