diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-26 12:33:14 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-26 13:37:24 +0300 |
commit | df1df29214d1b77e33a217256d8e11ae779ded4f (patch) | |
tree | ea1ccff506051e4beedd3396c9cb910154fd167f | |
parent | c083da78d46e31819d8e04e6624a25409a5f99d0 (diff) | |
download | ydb-df1df29214d1b77e33a217256d8e11ae779ded4f.tar.gz |
YQL-16443 replace dynamic polymorphism with static, use nfa for tables
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp | 94 |
1 files changed, 57 insertions, 37 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp index ed3c8327f8a..1a19d714d9b 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -21,15 +21,6 @@ using TOutputColumnOrder = TVector<std::pair<EOutputColumnSource, size_t>>; using namespace NYql::NMatchRecognize; -//Process input rows (one or many partitions) -struct IProcessMatchRecognize { - ///return true if it has output data ready - virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) = 0; - virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0; - virtual bool ProcessEndOfData(TComputationContext& ctx) = 0; - virtual ~IProcessMatchRecognize(){} -}; - struct TMatchRecognizeProcessorParameters { IComputationExternalNode* InputDataArg; TRowPattern Pattern; @@ -44,14 +35,25 @@ struct TMatchRecognizeProcessorParameters { TOutputColumnOrder OutputColumnOrder; }; -class TBackTrackingMatchRecognize: public IProcessMatchRecognize { +class TBackTrackingMatchRecognize { using TPartitionList = TSimpleList; using TRange = TPartitionList::TRange; using TMatchedVars = TMatchedVars<TRange>; public: + //TODO(YQL-16486): create a tree for backtracking(replace var names with indexes) + struct TPatternConfiguration { + using TPtr = std::shared_ptr<TPatternConfiguration>; + static TPtr Create(const TRowPattern& pattern, const THashMap<TString, size_t>& varNameToIndex) { + Y_UNUSED(pattern); + Y_UNUSED(varNameToIndex); + return std::make_shared<TPatternConfiguration>(); + } + }; + TBackTrackingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, const TMatchRecognizeProcessorParameters& parameters, + const TPatternConfiguration::TPtr pattern, const TContainerCacheOnContext& cache ) : PartitionKey(std::move(partitionKey)) @@ -60,14 +62,16 @@ public: , CurMatchedVars(parameters.Defines.size()) , MatchNumber(0) { + //TODO(YQL-16486) + Y_UNUSED(pattern); } - bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { + bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { Y_UNUSED(ctx); Rows.Append(std::move(row)); return false; } - NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { if (Matches.empty()) return NUdf::TUnboxedValue{}; Parameters.MatchedVarsArg->SetValue(ctx, ToValue(ctx.HolderFactory, std::move(Matches.front()))); @@ -93,7 +97,7 @@ public: } return result; } - bool ProcessEndOfData(TComputationContext& ctx) override { + bool ProcessEndOfData(TComputationContext& ctx) { //Assume, that data moved to IComputationExternalNode node, will not be modified or released //till the end of the current function auto rowsSize = Rows.Size(); @@ -126,11 +130,12 @@ private: ui64 MatchNumber; }; -class TStreamingMatchRecognize: public IProcessMatchRecognize { +class TStreamingMatchRecognize { using TPartitionList = TSparseList; using TRange = TPartitionList::TRange; using TMatchedVars = TMatchedVars<TRange>; public: + using TPatternConfiguration = TNfaTransitionGraph; TStreamingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, const TMatchRecognizeProcessorParameters& parameters, @@ -144,13 +149,13 @@ public: { } - bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override{ + bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TListValue<TSparseList>>(Rows)); Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(Rows.Size())); Nfa.ProcessRow(Rows.Append(std::move(row)), ctx); return Nfa.HasMatched(); } - NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { auto match = Nfa.GetMatched(); if (!match.has_value()) return NUdf::TUnboxedValue{}; @@ -176,7 +181,7 @@ public: } return result; } - bool ProcessEndOfData(TComputationContext& ctx) override { + bool ProcessEndOfData(TComputationContext& ctx) { Y_UNUSED(ctx); return false; } @@ -189,10 +194,11 @@ private: ui64 MatchNumber = 0; }; +template <typename Algo> class TStateForNonInterleavedPartitions - : public TComputationValue<TStateForNonInterleavedPartitions> - , public IProcessMatchRecognize + : public TComputationValue<TStateForNonInterleavedPartitions<Algo>> { + using TRowPatternConfiguration = typename Algo::TPatternConfiguration; public: TStateForNonInterleavedPartitions( TMemoryUsageInfo* memInfo, @@ -207,10 +213,11 @@ public: , PartitionKey(partitionKey) , PartitionKeyPacker(true, partitionKeyType) , Parameters(parameters) + , RowPatternConfiguration(TRowPatternConfiguration::Create(parameters.Pattern, parameters.VarNamesLookup)) , Cache(cache) , Terminating(false) {} - bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { + bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { MKQL_ENSURE(not DelayedRow, "Internal logic error"); //we're finalizing previous partition InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(row)); auto partitionKey = PartitionKey->GetValue(ctx); @@ -228,7 +235,7 @@ public: //be aware that the very first partition is created in the same manner as subsequent return false; } - bool ProcessEndOfData(TComputationContext& ctx) override { + bool ProcessEndOfData(TComputationContext& ctx) { if (Terminating) return false; Terminating = true; @@ -238,7 +245,7 @@ public: return false; } - NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { if (PartitionHandler) { auto result = PartitionHandler->GetOutputIfReady(ctx); if (result) { @@ -253,9 +260,10 @@ public: InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(temp)); auto partitionKey = PartitionKey->GetValue(ctx); CurPartitionPackedKey = PartitionKeyPacker.Pack(partitionKey); - PartitionHandler.reset(new TBackTrackingMatchRecognize( + PartitionHandler.reset(new Algo( std::move(partitionKey), Parameters, + RowPatternConfiguration, Cache )); PartitionHandler->ProcessInputRow(std::move(temp), ctx); @@ -267,11 +275,12 @@ public: } private: TString CurPartitionPackedKey; - std::unique_ptr<IProcessMatchRecognize> PartitionHandler; + std::unique_ptr<Algo> PartitionHandler; IComputationExternalNode* InputRowArg; IComputationNode* PartitionKey; TValuePackerGeneric<false> PartitionKeyPacker; const TMatchRecognizeProcessorParameters& Parameters; + const typename TRowPatternConfiguration::TPtr RowPatternConfiguration; const TContainerCacheOnContext& Cache; NUdf::TUnboxedValue DelayedRow; bool Terminating; @@ -279,9 +288,8 @@ private: class TStateForInterleavedPartitions : public TComputationValue<TStateForInterleavedPartitions> - , public IProcessMatchRecognize { - using TPartitionMap = std::unordered_map<TString, std::unique_ptr<IProcessMatchRecognize>>; + using TPartitionMap = std::unordered_map<TString, std::unique_ptr<TStreamingMatchRecognize>>; public: TStateForInterleavedPartitions( TMemoryUsageInfo* memInfo, @@ -300,7 +308,7 @@ public: , Cache(cache) { } - bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { + bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { auto partition = GetPartitionHandler(row, ctx); if (partition->second->ProcessInputRow(std::move(row), ctx)) { HasReadyOutput.push(partition); @@ -308,7 +316,7 @@ public: return !HasReadyOutput.empty(); } - bool ProcessEndOfData(TComputationContext& ctx) override { + bool ProcessEndOfData(TComputationContext& ctx) { for (auto it = Partitions.begin(); it != Partitions.end(); ++it) { auto b = it->second->ProcessEndOfData(ctx); if (b) { @@ -319,7 +327,7 @@ public: return !HasReadyOutput.empty(); } - NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { while (!HasReadyOutput.empty()) { auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx); if (not r) { @@ -591,14 +599,26 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation , std::move(parameters) ); } else { - return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions>(ctx.Mutables - , GetValueRepresentation(inputFlow.GetStaticType()) - , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) - , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) - , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) - , partitionKeySelector.GetStaticType() - , std::move(parameters) - ); + const bool useNfaForTables = true; //TODO(YQL-16486) get this flag from an optimizer + if (useNfaForTables) { + return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TStreamingMatchRecognize>>(ctx.Mutables + , GetValueRepresentation(inputFlow.GetStaticType()) + , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) + , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) + , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) + , partitionKeySelector.GetStaticType() + , std::move(parameters) + ); + } else { + return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TBackTrackingMatchRecognize>>(ctx.Mutables + , GetValueRepresentation(inputFlow.GetStaticType()) + , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) + , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) + , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) + , partitionKeySelector.GetStaticType() + , std::move(parameters) + ); + } } } |