diff options
author | mfilitov <mfilitov@yandex-team.com> | 2025-04-15 12:12:01 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2025-04-15 12:31:45 +0300 |
commit | fc2466491ab054c948d4c46b3ca6e27cce32f197 (patch) | |
tree | 6e45f155f363dd6ae9d4eae81a1af4a639442669 | |
parent | 0450496cd882e005d0364fb7ea99d2f9b94e77b1 (diff) | |
download | ydb-fc2466491ab054c948d4c46b3ca6e27cce32f197.tar.gz |
Use udf_log instead of log in comp nodes
Changing logger in comp nodes as described in this task: <https://github.com/ydb-platform/ydb/issues/15597>
Tests added here: <https://github.com/lll-phill-lll/ydb/blob/main/ydb/core/kqp/ut/runtime/kqp_scan_logging_ut.cpp#L76>
Will be uncommented after this pr.
upd. This PR was merged and then reverted due to incompatible logger settings in `tasks_runner`. The issue is fixed in this PR: <https://github.com/ydb-platform/ydb/pull/16837>
commit_hash:de998351a6e6f12a684770d56c6e2ac413c15c7a
7 files changed, 167 insertions, 72 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp index ac5e5108e3c..bb319984526 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp @@ -80,9 +80,12 @@ struct TGraceJoinPacker { ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum; std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces; bool IsAny; // Flag to support any join attribute + const NUdf::TLoggerPtr Logger; // Logger instance + const NUdf::TLogComponentId LogComponent; // Id of current component for logging. GracejJoin here inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs - TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny); + TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, + const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent); }; @@ -430,10 +433,13 @@ void TGraceJoinPacker::UnPack() { } -TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) : +TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, + const THolderFactory& holderFactory, bool isAny, NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0) : ColumnTypes(columnTypes) , HolderFactory(holderFactory) - , IsAny(isAny) { + , IsAny(isAny) + , Logger(logger) + , LogComponent(logComponent) { ui64 nColumns = ColumnTypes.size(); ui64 nKeyColumns = keyColumns.size(); @@ -550,8 +556,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con } TablePtr = std::make_unique<GraceJoin::TTable>( + Logger, LogComponent, PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum, - DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny ); + DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny); } @@ -569,7 +576,7 @@ public: EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, TComputationContext& ctx, - const bool isSelfJoin, bool isSpillingAllowed) + const bool isSelfJoin, bool isSpillingAllowed, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent) : TBase(memInfo) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -578,9 +585,9 @@ public: , RightKeyColumns(rightKeyColumns) , LeftRenames(leftRenames) , RightRenames(rightRenames) - , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly))) - , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly))) - , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) + , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::RightSemi || joinKind == EJoinKind::RightOnly), logger, logComponent)) + , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, ctx.HolderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly), logger, logComponent)) + , JoinedTablePtr(std::make_unique<GraceJoin::TTable>(logger, logComponent)) , JoinCompleted(std::make_unique<bool>(false)) , PartialJoinCompleted(std::make_unique<bool>(false)) , HaveMoreLeftRows(std::make_unique<bool>(true)) @@ -588,8 +595,10 @@ public: , IsSelfJoin_(isSelfJoin) , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns)) , IsSpillingAllowed(isSpillingAllowed) + , Logger(logger) + , LogComponent(logComponent) { - YQL_LOG(GRACEJOIN_DEBUG) << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind; + UDF_LOG(Logger, LogComponent, GRACEJOIN_DEBUG, TStringBuilder() << (const void *)&*JoinedTablePtr << "# AnyJoinSettings=" << (int)anyJoinSettings << " JoinKind=" << (int)joinKind); if (IsSelfJoin_) { LeftPacker->BatchSize = std::numeric_limits<ui64>::max(); RightPacker->BatchSize = std::numeric_limits<ui64>::max(); @@ -646,12 +655,14 @@ private: LogMemoryUsage(); switch(mode) { case EOperatingMode::InMemory: { - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() + << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory"); MKQL_ENSURE(false, "Internal logic error"); break; } case EOperatingMode::Spilling: { - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() + << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"); MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); auto spiller = ctx.SpillerFactory->CreateSpiller(); RightPacker->TablePtr->InitializeBucketSpillers(spiller); @@ -659,7 +670,8 @@ private: break; } case EOperatingMode::ProcessSpilled: { - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() + << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled"); SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets); for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i); @@ -796,6 +808,11 @@ private: } void LogMemoryUsage() const { + const auto memoryUsageLogLevel = NUdf::ELogLevel::Info; + if (!Logger->IsActive(LogComponent, memoryUsageLogLevel)) { + return; + } + const auto used = TlsAllocState->GetUsed(); const auto limit = TlsAllocState->GetLimit(); TStringBuilder logmsg; @@ -805,7 +822,7 @@ private: } logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB"; - YQL_LOG(INFO) << logmsg; + UDF_LOG(Logger, LogComponent, memoryUsageLogLevel, logmsg); } EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { @@ -857,11 +874,10 @@ private: (!*HaveMoreRightRows && (!*HaveMoreLeftRows || LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize )) || (!*HaveMoreLeftRows && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize))) { - YQL_LOG(GRACEJOIN_TRACE) + UDF_LOG(Logger, LogComponent, GRACEJOIN_TRACE, TStringBuilder() << (const void *)&*JoinedTablePtr << '#' << " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize - << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize - ; + << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize); auto& leftTable = *LeftPacker->TablePtr; auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr; @@ -1048,6 +1064,9 @@ private: NYql::NUdf::TCounter CounterOutputRows_; ui32 SpilledBucketsJoinOrderCurrentIndex = 0; std::vector<ui32> SpilledBucketsJoinOrder; + + const NUdf::TLoggerPtr Logger; + const NUdf::TLogComponentId LogComponent; }; class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> { @@ -1167,10 +1186,14 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr } void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); + NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("GraceJoin"); + UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized"); + state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>( FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, - ctx, IsSelfJoin_, IsSpillingAllowed); + ctx, IsSelfJoin_, IsSpillingAllowed, logger, logComponent); } IComputationWideFlowNode *const FlowLeft; diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp index 5163d779423..5ea63283143 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -368,6 +368,7 @@ bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind / } if (wasException || TlsAllocState->IsMemoryYellowZoneEnabled()) { + UDF_LOG(Logger_, LogComponent_, NUdf::ELogLevel::Debug, TStringBuilder() << "Preallocation failed. WasException: " << wasException); for (ui64 i = 0; i < bucket; ++i) { auto& b1 = t1.TableBuckets[i]; b1.JoinSlots.resize(0); @@ -673,7 +674,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef BloomHits_ += bloomHits; BloomLookups_ += bloomLookups; - YQL_LOG(GRACEJOIN_TRACE) + UDF_LOG(Logger_, LogComponent_, GRACEJOIN_TRACE, TStringBuilder() << (const void *)this << '#' << bucket << " Table1 " << JoinTable1->TableBucketsStats[bucket].TuplesNum @@ -685,7 +686,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef << " joinKind " << (int)JoinKind << " swapTables " << swapTables << " initHashTable " << initHashTable - ; + ); } HasMoreLeftTuples_ = hasMoreLeftTuples; @@ -1062,10 +1063,12 @@ void TTable::PrepareBucket(ui64 bucket) { } // Creates new table with key columns and data columns -TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, +TTable::TTable( NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent, + ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns, ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, - ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) : + ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, + bool isAny) : NumberOfKeyIntColumns(numberOfKeyIntColumns), NumberOfKeyStringColumns(numberOfKeyStringColumns), @@ -1075,7 +1078,9 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, NumberOfDataIColumns(numberOfDataIColumns), ColInterfaces(colInterfaces), NullsBitmapSize_(nullsBitmapSize), - IsAny_(isAny) { + IsAny_(isAny), + Logger_(logger), + LogComponent_(logComponent) { NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns; NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns; @@ -1114,16 +1119,21 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, } TTable::~TTable() { - YQL_LOG_IF(GRACEJOIN_DEBUG, InitHashTableCount_) + UDF_LOG_IF(InitHashTableCount_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder() << (const void *)this << '#' << "InitHashTableCount " << InitHashTableCount_ << " BloomLookups " << BloomLookups_ << " BloomHits " << BloomHits_ << " BloomFalsePositives " << BloomFalsePositives_ << " HashLookups " << HashLookups_ << " HashChainTraversal " << HashO1Iterations_/(double)HashLookups_ << " HashSlotOperations " << HashSlotIterations_/(double)HashLookups_ << " Table1 " << JoinTable1Total_ << " Table2 " << JoinTable2Total_ << " TuplesFound " << TuplesFound_ - ; - YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->AnyFiltered_) << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_; - YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable1 && JoinTable1->BloomLookups_) << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_; - YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->AnyFiltered_) << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_; - YQL_LOG_IF(GRACEJOIN_DEBUG, JoinTable2 && JoinTable2->BloomLookups_) << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_; + ); + + UDF_LOG_IF(JoinTable1 && JoinTable1->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder() + << (const void *)this << '#' << "L AnyFiltered " << JoinTable1->AnyFiltered_); + UDF_LOG_IF(JoinTable1 && JoinTable1->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder() + << (const void *)this << '#' << "L BloomLookups " << JoinTable1->BloomLookups_ << " BloomHits " << JoinTable1->BloomHits_); + UDF_LOG_IF(JoinTable2 && JoinTable2->AnyFiltered_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder() + << (const void *)this << '#' << "R AnyFiltered " << JoinTable2->AnyFiltered_); + UDF_LOG_IF(JoinTable2 && JoinTable2->BloomLookups_, Logger_, LogComponent_, GRACEJOIN_DEBUG, TStringBuilder() + << (const void *)this << '#' << "R BloomLookups " << JoinTable2->BloomLookups_ << " BloomHits " << JoinTable2->BloomHits_); }; TTableBucketSpiller::TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit) diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h index b918ce9b909..6c9256bc036 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h @@ -11,8 +11,8 @@ namespace NMiniKQL { namespace GraceJoin { class TTableBucketSpiller; -#define GRACEJOIN_DEBUG DEBUG -#define GRACEJOIN_TRACE TRACE +#define GRACEJOIN_DEBUG NUdf::ELogLevel::Debug +#define GRACEJOIN_TRACE NUdf::ELogLevel::Trace const ui64 BitsForNumberOfBuckets = 6; // 2^6 = 64 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; @@ -348,6 +348,9 @@ class TTable { ui64 TuplesFound_ = 0; // Total number of matching keys found during join + const NUdf::TLoggerPtr Logger_ = nullptr; // Logger instance + const NUdf::TLogComponentId LogComponent_ = 0; // Unique component id. GraceJoin here. + public: @@ -417,10 +420,12 @@ public: void Clear(); // Creates new table with key columns and data columns - TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, - ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, - ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, - ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); + TTable(NUdf::TLoggerPtr logger = nullptr, NUdf::TLogComponentId logComponent = 0, + ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, + ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, + ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, + ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, + bool isAny = false); enum class EAddTupleResult { Added, Unmatched, AnyMatch }; // Adds new tuple to the table. intColumns, stringColumns - data of columns, diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 8a3052faa1a..68b22cd7364 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -248,8 +248,19 @@ private: return KeyWidth + StateWidth; } public: - TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true) - : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), Hash(hash), Equal(equal) { + TState( + TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, + NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent, bool allowOutOfMemory = true + ) + : TBase(memInfo) + , KeyWidth(keyWidth) + , StateWidth(stateWidth) + , AllowOutOfMemory(allowOutOfMemory) + , Hash(hash) + , Equal(equal) + , Logger(logger) + , LogComponent(logComponent) + { CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; Tongue = CurrentPage->data(); @@ -295,7 +306,7 @@ public: try { States->CheckGrow(); } catch (TMemoryLimitExceededException) { - YQL_LOG(INFO) << "State failed to grow"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() << "State failed to grow"); if (IsOutOfMemory || !AllowOutOfMemory) { throw; } else { @@ -373,6 +384,8 @@ private: std::unique_ptr<TStates> States; const THashFunc Hash; const TEqualsFunc Equal; + const NUdf::TLoggerPtr Logger; + const NUdf::TLogComponentId LogComponent; }; class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { @@ -421,10 +434,11 @@ public: TSpillingSupportState( TMemoryUsageInfo* memInfo, const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize, - const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx + const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx, + NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent ) : TBase(memInfo) - , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory) + , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, logger, logComponent, allowSpilling && ctx.SpillerFactory) , UsedInputItemType(usedInputItemType) , KeyAndStateType(keyAndStateType) , KeyWidth(keyWidth) @@ -436,6 +450,8 @@ public: , Equal(equal) , AllowSpilling(allowSpilling) , Ctx(ctx) + , Logger(logger) + , LogComponent(logComponent) { BufferForUsedInputItems.reserve(usedInputItemType->GetElementsCount()); Tongue = InMemoryProcessingState.Tongue; @@ -726,6 +742,10 @@ private: } void LogMemoryUsage() const { + const auto memoryUsageLogLevel = NUdf::ELogLevel::Info; + if (!Logger->IsActive(LogComponent, memoryUsageLogLevel)) { + return; + } const auto used = TlsAllocState->GetUsed(); const auto limit = TlsAllocState->GetLimit(); TStringBuilder logmsg; @@ -735,7 +755,7 @@ private: } logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB"; - YQL_LOG(INFO) << logmsg; + UDF_LOG(Logger, LogComponent, memoryUsageLogLevel, logmsg); } void SpillMoreStateFromBucket(TSpilledBucket& bucket) { @@ -858,31 +878,32 @@ private: void SwitchMode(EOperatingMode mode) { switch(mode) { case EOperatingMode::InMemory: { - YQL_LOG(INFO) << "switching Memory mode to InMemory"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to InMemory"); MKQL_ENSURE(false, "Internal logic error"); break; } case EOperatingMode::SplittingState: { - YQL_LOG(INFO) << "switching Memory mode to SplittingState"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to SplittingState"); MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); SpilledBuckets.resize(SpilledBucketCount); auto spiller = Ctx.SpillerFactory->CreateSpiller(); for (auto &b: SpilledBuckets) { b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB); b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB); - b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false); + b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, + KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, Logger, LogComponent, false); } break; } case EOperatingMode::Spilling: { - YQL_LOG(INFO) << "switching Memory mode to Spilling"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to Spilling"); MKQL_ENSURE(EOperatingMode::SplittingState == Mode || EOperatingMode::InMemory == Mode, "Internal logic error"); Tongue = ViewForKeyAndState.data(); break; } case EOperatingMode::ProcessSpilled: { - YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "switching Memory mode to ProcessSpilled"); MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error"); @@ -940,6 +961,9 @@ private: TComputationContext& Ctx; NYql::NUdf::TCounter CounterOutputRows_; + + const NUdf::TLoggerPtr Logger; + const NUdf::TLogComponentId LogComponent; }; #ifndef MKQL_DISABLE_CODEGEN @@ -1402,12 +1426,18 @@ public: #endif private: void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); + NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideCombine"); + UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized"); + #ifdef MKQL_DISABLE_CODEGEN - state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(), TMyValueHasher(KeyTypes), TMyValueEqual(KeyTypes)); + state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(), + TMyValueHasher(KeyTypes), TMyValueEqual(KeyTypes), logger, logComponent); #else state = ctx.HolderFactory.Create<TState>(Nodes.KeyNodes.size(), Nodes.StateNodes.size(), ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)), - ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)) + ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)), + logger, logComponent ); #endif if (ctx.CountersProvider) { @@ -1832,6 +1862,10 @@ public: #endif private: void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); + NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideLastCombine"); + UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized"); + state = ctx.HolderFactory.Create<TSpillingSupportState>(UsedInputItemType, KeyAndStateType, Nodes.KeyNodes.size(), Nodes.ItemNodes.size(), @@ -1839,11 +1873,13 @@ private: TMyValueHasher(KeyTypes), TMyValueEqual(KeyTypes), #else - ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)), - ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)), + ctx.ExecuteLLVM && Hash ? THashFunc(std::ptr_fun(Hash)) : THashFunc(TMyValueHasher(KeyTypes)), + ctx.ExecuteLLVM && Equals ? TEqualsFunc(std::ptr_fun(Equals)) : TEqualsFunc(TMyValueEqual(KeyTypes)), #endif AllowSpilling, - ctx + ctx, + logger, + logComponent ); } @@ -1996,7 +2032,6 @@ IComputationNode* WrapWideCombiner(TCallable& callable, const TComputationNodeFa } IComputationNode* WrapWideLastCombiner(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - YQL_LOG(INFO) << "Found non-serializable type, spilling is disabled"; return WrapWideCombinerT<true>(callable, ctx, false); } diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp index def9de2b960..c814cf09a0d 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -651,7 +651,7 @@ private: public: TSpillingSupportState(TMemoryUsageInfo* memInfo, const bool* directons, size_t keyWidth, const TCompareFunc& compare, - const std::vector<ui32>& indexes, TMultiType* tupleMultiType, const TComputationContext& ctx) + const std::vector<ui32>& indexes, TMultiType* tupleMultiType, const TComputationContext& ctx, NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent) : TBase(memInfo) , Indexes(indexes) , Directions(directons, directons + keyWidth) @@ -659,6 +659,8 @@ public: , Fields(Indexes.size(), nullptr) , TupleMultiType(tupleMultiType) , Ctx(ctx) + , Logger(logger) + , LogComponent(logComponent) { if (Ctx.SpillerFactory) { Spiller = Ctx.SpillerFactory->CreateSpiller(); @@ -678,7 +680,8 @@ public: ResetFields(); auto nextMode = (IsReadFromChannelFinished() ? EOperatingMode::ProcessSpilled : EOperatingMode::InMemory); - YQL_LOG(INFO) << (nextMode == EOperatingMode::ProcessSpilled ? "Switching to ProcessSpilled" : "Switching to Memory mode"); + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() << + (nextMode == EOperatingMode::ProcessSpilled ? "Switching to ProcessSpilled" : "Switching to Memory mode")); SwitchMode(nextMode); return IsReadyToContinue(); @@ -712,8 +715,8 @@ public: const auto used = TlsAllocState->GetUsed(); const auto limit = TlsAllocState->GetLimit(); - YQL_LOG(INFO) << "Yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit; - YQL_LOG(INFO) << "Switching Memory mode to Spilling"; + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, TStringBuilder() << "Yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit); + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, "Switching Memory mode to Spilling"); SwitchMode(EOperatingMode::Spilling); } @@ -860,6 +863,8 @@ private: std::vector<TSpilledUnboxedValuesIterator> SpilledUnboxedValuesIterators; ISpiller::TPtr Spiller = nullptr; bool IsHeapBuilt = false; + const NYql::NUdf::TLoggerPtr Logger; + const NYql::NUdf::TLogComponentId LogComponent; }; class TWideSortWrapper: public TStatefulWideFlowCodegeneratorNode<TWideSortWrapper> @@ -1101,10 +1106,13 @@ public: #endif private: void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state, const bool* directions) const { + NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); + NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("WideSort"); + UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized"); #ifdef MKQL_DISABLE_CODEGEN - state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), TMyValueCompare(Keys), Indexes, TupleMultiType, ctx); + state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), TMyValueCompare(Keys), Indexes, TupleMultiType, ctx, logger, logComponent); #else - state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), ctx.ExecuteLLVM && Compare ? TCompareFunc(Compare) : TCompareFunc(TMyValueCompare(Keys)), Indexes, TupleMultiType, ctx); + state = ctx.HolderFactory.Create<TSpillingSupportState>(directions, Directions.size(), ctx.ExecuteLLVM && Compare ? TCompareFunc(Compare) : TCompareFunc(TMyValueCompare(Keys)), Indexes, TupleMultiType, ctx, logger, logComponent); #endif } diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 3bedebe1c87..293518a991c 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -205,9 +205,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui32 bigStrSize[2] = {151, 151}; - GraceJoin::TTable bigTable(1,1,1,1); - GraceJoin::TTable smallTable(1,1,1,1); - GraceJoin::TTable joinTable(1,1,1,1); + GraceJoin::TTable bigTable(nullptr,0,1,1,1,1); + GraceJoin::TTable smallTable(nullptr,0,1,1,1,1); + GraceJoin::TTable joinTable(nullptr,0,1,1,1,1); const ui64 TupleSize = 1024; @@ -269,9 +269,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { NMemInfo::TMemInfo mi = NMemInfo::GetMemInfo(); CTEST << "Mem usage before tables tuples added (MB): " << mi.RSS / (1024 * 1024) << Endl; - GraceJoin::TTable bigTable(1,1,1,1); - GraceJoin::TTable smallTable(1,1,1,1); - GraceJoin::TTable joinTable(1,1,1,1); + GraceJoin::TTable bigTable(nullptr,0,1,1,1,1); + GraceJoin::TTable smallTable(nullptr,0,1,1,1,1); + GraceJoin::TTable joinTable(nullptr,0,1,1,1,1); std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); @@ -442,9 +442,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { NMemInfo::TMemInfo mi = NMemInfo::GetMemInfo(); CTEST << "Mem usage before tables tuples added (MB): " << mi.RSS / (1024 * 1024) << Endl; - GraceJoin::TTable bigTable(1,1,1,1); - GraceJoin::TTable smallTable(1,1,1,1); - GraceJoin::TTable joinTable(1,1,1,1); + GraceJoin::TTable bigTable(nullptr,0,1,1,1,1); + GraceJoin::TTable smallTable(nullptr,0,1,1,1,1); + GraceJoin::TTable joinTable(nullptr,0,1,1,1,1); std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); @@ -625,9 +625,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinAnyTest) { - GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, true); - GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, true); - GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, true); + GraceJoin::TTable bigTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, true); + GraceJoin::TTable smallTable(nullptr, 0, 1,1,1,1,0,0,1, nullptr, true); + GraceJoin::TTable joinTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, true); std::mt19937_64 rng; std::uniform_int_distribution<ui64> dist(0, 10000 - 1); @@ -763,9 +763,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceSelfJoinTest) { - GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, false); - GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, false); - GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, false); + GraceJoin::TTable bigTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, false); + GraceJoin::TTable smallTable(nullptr, 0, 1,1,1,1,0,0,1, nullptr, false); + GraceJoin::TTable joinTable (nullptr, 0, 1,1,1,1,0,0,1, nullptr, false); std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); diff --git a/yql/essentials/public/udf/udf_log.h b/yql/essentials/public/udf/udf_log.h index 2e39ad1e040..b09edef148f 100644 --- a/yql/essentials/public/udf/udf_log.h +++ b/yql/essentials/public/udf/udf_log.h @@ -11,6 +11,20 @@ namespace NYql { namespace NUdf { +#define UDF_LOG(logger, component, level, msg) \ +do { \ + if ((logger) && (logger)->IsActive(component, level)) { \ + (logger)->Log(component, level, msg); \ + } \ +} while (0) + +#define UDF_LOG_IF(condition, logger, component, level, msg) \ +do { \ + if ((logger) && (condition) && (logger)->IsActive(component, level)) { \ + (logger)->Log(component, level, msg); \ + } \ +} while (0) + #define UDF_LOG_LEVEL(XX) \ XX(Fatal, 0) \ XX(Error, 1) \ |