aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2025-04-15 12:12:01 +0300
committermfilitov <mfilitov@yandex-team.com>2025-04-15 12:31:45 +0300
commitfc2466491ab054c948d4c46b3ca6e27cce32f197 (patch)
tree6e45f155f363dd6ae9d4eae81a1af4a639442669
parent0450496cd882e005d0364fb7ea99d2f9b94e77b1 (diff)
downloadydb-fc2466491ab054c948d4c46b3ca6e27cce32f197.tar.gz
Use udf_log instead of log in comp nodes
Changing logger in comp nodes as described in this task: <https://github.com/ydb-platform/ydb/issues/15597> Tests added here: <https://github.com/lll-phill-lll/ydb/blob/main/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp#L76> Will be uncommented after this pr. upd. This PR was merged and then reverted due to incompatible logger settings in `tasks_runner`. The issue is fixed in this PR: <https://github.com/ydb-platform/ydb/pull/16837> commit_hash:de998351a6e6f12a684770d56c6e2ac413c15c7a
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp57
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp32
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h17
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp69
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp20
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp30
-rw-r--r--yql/essentials/public/udf/udf_log.h14
7 files changed, 167 insertions, 72 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
index ac5e5108e3c..bb319984526 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
@@ -80,9 +80,12 @@ struct TGraceJoinPacker {
ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces;
bool IsAny; // Flag to support any join attribute
+ const NUdf::TLoggerPtr Logger; // Logger instance
+ const NUdf::TLogComponentId LogComponent; // Id of current component for logging. GracejJoin here
inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
- TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny);
+ TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns,
+ const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent);
};
@@ -430,10 +433,13 @@ void TGraceJoinPacker::UnPack() {
}
-TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) :
+TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns,
+ const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0) :
ColumnTypes(columnTypes)
, HolderFactory(holderFactory)
- , IsAny(isAny) {
+ , IsAny(isAny)
+ , Logger(logger)
+ , LogComponent(logComponent) {
ui64 nColumns = ColumnTypes.size();
ui64 nKeyColumns = keyColumns.size();
@@ -550,8 +556,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
}
TablePtr = std::make_unique<GraceJoin::TTable>(
+ Logger, LogComponent,
PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum,
- DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny );
+ DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny);
}
@@ -569,7 +576,7 @@ public:
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, TComputationContext& ctx,
- const bool isSelfJoin, bool isSpillingAllowed)
+ const bool isSelfJoin, bool isSpillingAllowed, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent)
: TBase(memInfo)
, FlowLeft(flowLeft)
, FlowRight(flowRight)
@@ -578,9 +585,9 @@ public:
, RightKeyColumns(rightKeyColumns)
, LeftRenames(leftRenames)
, RightRenames(rightRenames)
- , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly)))
- , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly)))
- , JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
+ , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly), logger, logComponent))
+ , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly), logger, logComponent))
+ , JoinedTablePtr(std::make_unique<GraceJoin::TTable>(logger, logComponent))
, JoinCompleted(std::make_unique<bool>(false))
, PartialJoinCompleted(std::make_unique<bool>(false))
, HaveMoreLeftRows(std::make_unique<bool>(true))
@@ -588,8 +595,10 @@ public:
, IsSelfJoin_(isSelfJoin)
, SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
, IsSpillingAllowed(isSpillingAllowed)
+ , Logger(logger)
+ , LogComponent(logComponent)
{
- YQL_LOG(GRACEJOIN_DEBUG) << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind;
+ UDF_LOG(Logger, LogComponent, GRACEJOIN_DEBUG, TStringBuilder() << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind);
if (IsSelfJoin_) {
LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
RightPacker->BatchSize = std::numeric_limits<ui64>::max();
@@ -646,12 +655,14 @@ private:
LogMemoryUsage();
switch(mode) {
case EOperatingMode::InMemory: {
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
+ << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory");
MKQL_ENSURE(false, "Internal logic error");
break;
}
case EOperatingMode::Spilling: {
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
+ << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling");
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
auto spiller = ctx.SpillerFactory->CreateSpiller();
RightPacker->TablePtr->InitializeBucketSpillers(spiller);
@@ -659,7 +670,8 @@ private:
break;
}
case EOperatingMode::ProcessSpilled: {
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder()
+ << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled");
SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
@@ -796,6 +808,11 @@ private:
}
void LogMemoryUsage() const {
+ const auto memoryUsageLogLevel = NUdf::ELogLevel::Info;
+ if (!Logger->IsActive(LogComponent, memoryUsageLogLevel)) {
+ return;
+ }
+
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();
TStringBuilder logmsg;
@@ -805,7 +822,7 @@ private:
}
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
- YQL_LOG(INFO) << logmsg;
+ UDF_LOG(Logger, LogComponent, memoryUsageLogLevel, logmsg);
}
EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
@@ -857,11 +874,10 @@ private:
(!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) ||
(!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) {
- YQL_LOG(GRACEJOIN_TRACE)
+ UDF_LOG(Logger, LogComponent, GRACEJOIN_TRACE, TStringBuilder()
<< (const void *)&*JoinedTablePtr << '#'
<< " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
- << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
- ;
+ << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize);
auto& leftTable = *LeftPacker->TablePtr;
auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr;
@@ -1048,6 +1064,9 @@ private:
NYql::NUdf::TCounter CounterOutputRows_;
ui32 SpilledBucketsJoinOrderCurrentIndex = 0;
std::vector<ui32> SpilledBucketsJoinOrder;
+
+ const NUdf::TLoggerPtr Logger;
+ const NUdf::TLogComponentId LogComponent;
};
class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {
@@ -1167,10 +1186,14 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
}
void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger();
+ NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("GraceJoin");
+ UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");
+
state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
- ctx, IsSelfJoin_, IsSpillingAllowed);
+ ctx, IsSelfJoin_, IsSpillingAllowed, logger, logComponent);
}
IComputationWideFlowNode *const FlowLeft;
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 5163d779423..5ea63283143 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -368,6 +368,7 @@ bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind /
}
if (wasException || TlsAllocState->IsMemoryYellowZoneEnabled()) {
+ UDF_LOG(Logger_, LogComponent_, NUdf::ELogLevel::Debug, TStringBuilder() << "Preallocation failed. WasException: " << wasException);
for (ui64 i = 0; i < bucket; ++i) {
auto& b1 = t1.TableBuckets[i];
b1.JoinSlots.resize(0);
@@ -673,7 +674,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
BloomHits_ += bloomHits;
BloomLookups_ += bloomLookups;
- YQL_LOG(GRACEJOIN_TRACE)
+ UDF_LOG(Logger_, LogComponent_, GRACEJOIN_TRACE, TStringBuilder()
<< (const void *)this << '#'
<< bucket
<< " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum
@@ -685,7 +686,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
<< " joinKind " << (int)JoinKind
<< " swapTables " << swapTables
<< " initHashTable " << initHashTable
- ;
+ );
}
HasMoreLeftTuples_ = hasMoreLeftTuples;
@@ -1062,10 +1063,12 @@ void TTable::PrepareBucket(ui64 bucket) {
}
// Creates new table with key columns and data columns
-TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
+TTable::TTable( NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent,
+ ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns,
- ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) :
+ ui64 nullsBitmapSize, TColTypeInterface * colInterfaces,
+ bool isAny) :
NumberOfKeyIntColumns(numberOfKeyIntColumns),
NumberOfKeyStringColumns(numberOfKeyStringColumns),
@@ -1075,7 +1078,9 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
NumberOfDataIColumns(numberOfDataIColumns),
ColInterfaces(colInterfaces),
NullsBitmapSize_(nullsBitmapSize),
- IsAny_(isAny) {
+ IsAny_(isAny),
+ Logger_(logger),
+ LogComponent_(logComponent) {
NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
@@ -1114,16 +1119,21 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
}
TTable::~TTable() {
- YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_)
+ UDF_LOG_IF(InitHashTableCount_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
<< (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_
<< " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_
<< " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_
<< " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_
- ;
- YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_;
- YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_;
- YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_;
- YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_;
+ );
+
+ UDF_LOG_IF(JoinTable1 && JoinTable1->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
+ << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_);
+ UDF_LOG_IF(JoinTable1 && JoinTable1->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
+ << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_);
+ UDF_LOG_IF(JoinTable2 && JoinTable2->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
+ << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_);
+ UDF_LOG_IF(JoinTable2 && JoinTable2->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder()
+ << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_);
};
TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit)
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
index b918ce9b909..6c9256bc036 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -11,8 +11,8 @@ namespace NMiniKQL {
namespace GraceJoin {
class TTableBucketSpiller;
-#define GRACEJOIN_DEBUG DEBUG
-#define GRACEJOIN_TRACE TRACE
+#define GRACEJOIN_DEBUG NUdf::ELogLevel::Debug
+#define GRACEJOIN_TRACE NUdf::ELogLevel::Trace
const ui64 BitsForNumberOfBuckets = 6; // 2^6 = 64
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
@@ -348,6 +348,9 @@ class TTable {
ui64 TuplesFound_ = 0; // Total number of matching keys found during join
+ const NUdf::TLoggerPtr Logger_ = nullptr; // Logger instance
+ const NUdf::TLogComponentId LogComponent_ = 0; // Unique component id. GraceJoin here.
+
public:
@@ -417,10 +420,12 @@ public:
void Clear();
// Creates new table with key columns and data columns
- TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
- ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
- ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
- ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
+ TTable(NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0,
+ ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
+ ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
+ ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
+ ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr,
+ bool isAny = false);
enum class EAddTupleResult { Added, Unmatched, AnyMatch };
// Adds new tuple to the table. intColumns, stringColumns - data of columns,
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 8a3052faa1a..68b22cd7364 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -248,8 +248,19 @@ private:
return KeyWidth + StateWidth;
}
public:
- TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true)
- : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), Hash(hash), Equal(equal) {
+ TState(
+ TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal,
+ NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent, bool allowOutOfMemory = true
+ )
+ : TBase(memInfo)
+ , KeyWidth(keyWidth)
+ , StateWidth(stateWidth)
+ , AllowOutOfMemory(allowOutOfMemory)
+ , Hash(hash)
+ , Equal(equal)
+ , Logger(logger)
+ , LogComponent(logComponent)
+ {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
@@ -295,7 +306,7 @@ public:
try {
States->CheckGrow();
} catch (TMemoryLimitExceededException) {
- YQL_LOG(INFO) << "State failed to grow";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() << "State failed to grow");
if (IsOutOfMemory || !AllowOutOfMemory) {
throw;
} else {
@@ -373,6 +384,8 @@ private:
std::unique_ptr<TStates> States;
const THashFunc Hash;
const TEqualsFunc Equal;
+ const NUdf::TLoggerPtr Logger;
+ const NUdf::TLogComponentId LogComponent;
};
class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
@@ -421,10 +434,11 @@ public:
TSpillingSupportState(
TMemoryUsageInfo* memInfo,
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
- const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
+ const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx,
+ NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent
)
: TBase(memInfo)
- , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory)
+ , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, logger, logComponent, allowSpilling && ctx.SpillerFactory)
, UsedInputItemType(usedInputItemType)
, KeyAndStateType(keyAndStateType)
, KeyWidth(keyWidth)
@@ -436,6 +450,8 @@ public:
, Equal(equal)
, AllowSpilling(allowSpilling)
, Ctx(ctx)
+ , Logger(logger)
+ , LogComponent(logComponent)
{
BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount());
Tongue = InMemoryProcessingState.Tongue;
@@ -726,6 +742,10 @@ private:
}
void LogMemoryUsage() const {
+ const auto memoryUsageLogLevel = NUdf::ELogLevel::Info;
+ if (!Logger->IsActive(LogComponent, memoryUsageLogLevel)) {
+ return;
+ }
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();
TStringBuilder logmsg;
@@ -735,7 +755,7 @@ private:
}
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
- YQL_LOG(INFO) << logmsg;
+ UDF_LOG(Logger, LogComponent, memoryUsageLogLevel, logmsg);
}
void SpillMoreStateFromBucket(TSpilledBucket& bucket) {
@@ -858,31 +878,32 @@ private:
void SwitchMode(EOperatingMode mode) {
switch(mode) {
case EOperatingMode::InMemory: {
- YQL_LOG(INFO) << "switching Memory mode to InMemory";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to InMemory");
MKQL_ENSURE(false, "Internal logic error");
break;
}
case EOperatingMode::SplittingState: {
- YQL_LOG(INFO) << "switching Memory mode to SplittingState";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to SplittingState");
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
SpilledBuckets.resize(SpilledBucketCount);
auto spiller = Ctx.SpillerFactory->CreateSpiller();
for (auto &b: SpilledBuckets) {
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
- b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false);
+ b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth,
+ KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, Logger, LogComponent, false);
}
break;
}
case EOperatingMode::Spilling: {
- YQL_LOG(INFO) << "switching Memory mode to Spilling";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to Spilling");
MKQL_ENSURE(EOperatingMode::SplittingState == Mode || EOperatingMode::InMemory == Mode, "Internal logic error");
Tongue = ViewForKeyAndState.data();
break;
}
case EOperatingMode::ProcessSpilled: {
- YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to ProcessSpilled");
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
@@ -940,6 +961,9 @@ private:
TComputationContext& Ctx;
NYql::NUdf::TCounter CounterOutputRows_;
+
+ const NUdf::TLoggerPtr Logger;
+ const NUdf::TLogComponentId LogComponent;
};
#ifndef MKQL_DISABLE_CODEGEN
@@ -1402,12 +1426,18 @@ public:
#endif
private:
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger();
+ NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideCombine");
+ UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");
+
#ifdef MKQL_DISABLE_CODEGEN
- state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(), TMyValueHasher(KeyTypes), TMyValueEqual(KeyTypes));
+ state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(),
+ TMyValueHasher(KeyTypes), TMyValueEqual(KeyTypes), logger, logComponent);
#else
state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(),
ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)),
- ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes))
+ ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)),
+ logger, logComponent
);
#endif
if (ctx.CountersProvider) {
@@ -1832,6 +1862,10 @@ public:
#endif
private:
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger();
+ NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideLastCombine");
+ UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");
+
state = ctx.HolderFactory.Create<TSpillingSupportState>(UsedInputItemType, KeyAndStateType,
Nodes.KeyNodes.size(),
Nodes.ItemNodes.size(),
@@ -1839,11 +1873,13 @@ private:
TMyValueHasher(KeyTypes),
TMyValueEqual(KeyTypes),
#else
- ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)),
- ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)),
+ ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)),
+ ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)),
#endif
AllowSpilling,
- ctx
+ ctx,
+ logger,
+ logComponent
);
}
@@ -1996,7 +2032,6 @@ IComputationNode* WrapWideCombiner(TCallable& callable, const TComputationNodeFa
}
IComputationNode* WrapWideLastCombiner(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- YQL_LOG(INFO) << "Found non-serializable type, spilling is disabled";
return WrapWideCombinerT<true>(callable, ctx, false);
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
index def9de2b960..c814cf09a0d 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
@@ -651,7 +651,7 @@ private:
public:
TSpillingSupportState(TMemoryUsageInfo* memInfo, const bool* directons, size_t keyWidth, const TCompareFunc& compare,
- const std::vector<ui32>& indexes, TMultiType* tupleMultiType, const TComputationContext& ctx)
+ const std::vector<ui32>& indexes, TMultiType* tupleMultiType, const TComputationContext& ctx, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent)
: TBase(memInfo)
, Indexes(indexes)
, Directions(directons, directons + keyWidth)
@@ -659,6 +659,8 @@ public:
, Fields(Indexes.size(), nullptr)
, TupleMultiType(tupleMultiType)
, Ctx(ctx)
+ , Logger(logger)
+ , LogComponent(logComponent)
{
if (Ctx.SpillerFactory) {
Spiller = Ctx.SpillerFactory->CreateSpiller();
@@ -678,7 +680,8 @@ public:
ResetFields();
auto nextMode = (IsReadFromChannelFinished() ? EOperatingMode::ProcessSpilled : EOperatingMode::InMemory);
- YQL_LOG(INFO) << (nextMode == EOperatingMode::ProcessSpilled ? "Switching to ProcessSpilled" : "Switching to Memory mode");
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() <<
+ (nextMode == EOperatingMode::ProcessSpilled ? "Switching to ProcessSpilled" : "Switching to Memory mode"));
SwitchMode(nextMode);
return IsReadyToContinue();
@@ -712,8 +715,8 @@ public:
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();
- YQL_LOG(INFO) << "Yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit;
- YQL_LOG(INFO) << "Switching Memory mode to Spilling";
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() << "Yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit);
+ UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "Switching Memory mode to Spilling");
SwitchMode(EOperatingMode::Spilling);
}
@@ -860,6 +863,8 @@ private:
std::vector<TSpilledUnboxedValuesIterator> SpilledUnboxedValuesIterators;
ISpiller::TPtr Spiller = nullptr;
bool IsHeapBuilt = false;
+ const NYql::NUdf::TLoggerPtr Logger;
+ const NYql::NUdf::TLogComponentId LogComponent;
};
class TWideSortWrapper: public TStatefulWideFlowCodegeneratorNode<TWideSortWrapper>
@@ -1101,10 +1106,13 @@ public:
#endif
private:
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state, const bool* directions) const {
+ NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger();
+ NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideSort");
+ UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");
#ifdef MKQL_DISABLE_CODEGEN
- state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), TMyValueCompare(Keys), Indexes, TupleMultiType, ctx);
+ state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), TMyValueCompare(Keys), Indexes, TupleMultiType, ctx, logger, logComponent);
#else
- state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), ctx.ExecuteLLVM && Compare ? TCompareFunc(Compare) : TCompareFunc(TMyValueCompare(Keys)), Indexes, TupleMultiType, ctx);
+ state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), ctx.ExecuteLLVM && Compare ? TCompareFunc(Compare) : TCompareFunc(TMyValueCompare(Keys)), Indexes, TupleMultiType, ctx, logger, logComponent);
#endif
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 3bedebe1c87..293518a991c 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -205,9 +205,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui32 bigStrSize[2] = {151, 151};
- GraceJoin::TTable bigTable(1,1,1,1);
- GraceJoin::TTable smallTable(1,1,1,1);
- GraceJoin::TTable joinTable(1,1,1,1);
+ GraceJoin::TTable bigTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable smallTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable joinTable(nullptr,0,1,1,1,1);
const ui64 TupleSize = 1024;
@@ -269,9 +269,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
NMemInfo::TMemInfo mi = NMemInfo::GetMemInfo();
CTEST << "Mem usage before tables tuples added (MB): " << mi.RSS / (1024 * 1024) << Endl;
- GraceJoin::TTable bigTable(1,1,1,1);
- GraceJoin::TTable smallTable(1,1,1,1);
- GraceJoin::TTable joinTable(1,1,1,1);
+ GraceJoin::TTable bigTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable smallTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable joinTable(nullptr,0,1,1,1,1);
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
@@ -442,9 +442,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
NMemInfo::TMemInfo mi = NMemInfo::GetMemInfo();
CTEST << "Mem usage before tables tuples added (MB): " << mi.RSS / (1024 * 1024) << Endl;
- GraceJoin::TTable bigTable(1,1,1,1);
- GraceJoin::TTable smallTable(1,1,1,1);
- GraceJoin::TTable joinTable(1,1,1,1);
+ GraceJoin::TTable bigTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable smallTable(nullptr,0,1,1,1,1);
+ GraceJoin::TTable joinTable(nullptr,0,1,1,1,1);
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
@@ -625,9 +625,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinAnyTest) {
- GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, true);
- GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, true);
- GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, true);
+ GraceJoin::TTable bigTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, true);
+ GraceJoin::TTable smallTable(nullptr, 0, 1,1,1,1,0,0,1, nullptr, true);
+ GraceJoin::TTable joinTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, true);
std::mt19937_64 rng;
std::uniform_int_distribution<ui64> dist(0, 10000 - 1);
@@ -763,9 +763,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceSelfJoinTest) {
- GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, false);
- GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, false);
- GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, false);
+ GraceJoin::TTable bigTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, false);
+ GraceJoin::TTable smallTable(nullptr, 0, 1,1,1,1,0,0,1, nullptr, false);
+ GraceJoin::TTable joinTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, false);
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
diff --git a/yql/essentials/public/udf/udf_log.h b/yql/essentials/public/udf/udf_log.h
index 2e39ad1e040..b09edef148f 100644
--- a/yql/essentials/public/udf/udf_log.h
+++ b/yql/essentials/public/udf/udf_log.h
@@ -11,6 +11,20 @@
namespace NYql {
namespace NUdf {
+#define UDF_LOG(logger, component, level, msg) \
+do { \
+ if ((logger) && (logger)->IsActive(component, level)) { \
+ (logger)->Log(component, level, msg); \
+ } \
+} while (0)
+
+#define UDF_LOG_IF(condition, logger, component, level, msg) \
+do { \
+ if ((logger) && (condition) && (logger)->IsActive(component, level)) { \
+ (logger)->Log(component, level, msg); \
+ } \
+} while (0)
+
#define UDF_LOG_LEVEL(XX) \
XX(Fatal, 0) \
XX(Error, 1) \