diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-05 16:38:38 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-05 18:02:12 +0300 |
commit | ad6ab7f24891543e0729e88860a31ae95d11c468 (patch) | |
tree | 4550307c1a288e194f1ac30f918434528dbe5462 | |
parent | f5907623ca4bf9c71ba0d162c40f76a44ca88ee8 (diff) | |
download | ydb-ad6ab7f24891543e0729e88860a31ae95d11c468.tar.gz |
YQL-16486 skeleton for back-tracking impl of MATCH_RECOGNIZE
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp | 115 |
1 files changed, 97 insertions, 18 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp index 74504eb06f..bc850ac9b6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -23,13 +23,94 @@ struct IProcessMatchRecognize { ///return true if it has output data ready virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) = 0; virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0; - virtual bool ProcessEndOfData() = 0; + virtual bool ProcessEndOfData(TComputationContext& ctx) = 0; virtual ~IProcessMatchRecognize(){} }; -class TStreamMatchRecognize: public IProcessMatchRecognize { +class TBackTrackingMatchRecognize: public IProcessMatchRecognize { public: - TStreamMatchRecognize( + TBackTrackingMatchRecognize( + NUdf::TUnboxedValue&& partitionKey, + IComputationExternalNode* matchedVarsArg, + TVector<IComputationNode*>& measures, + const TOutputColumnOrder& outputColumnOrder, + IComputationExternalNode* currentRowIndexArg, + const TVector<IComputationNode*>& defines, + const TContainerCacheOnContext& cache + ) + : PartitionKey(std::move(partitionKey)) + , MatchedVarsArg(matchedVarsArg) + , Measures(measures) + , OutputColumnOrder(outputColumnOrder) + , CurrentRowIndexArg(currentRowIndexArg) + , Defines(defines) + , Cache(cache) + , MatchedVars(Defines.size()) + , HasMatch(false) + { + } + + bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { + Y_UNUSED(ctx); + Rows.push_back(std::move(row)); + return false; + } + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + if (!HasMatch) + return NUdf::TUnboxedValue{}; + MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue>(MatchedVars)); + HasMatch = false; + NUdf::TUnboxedValue *itemsPtr = nullptr; + const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr); + for (auto const& c: OutputColumnOrder) { + switch(c.first) { + case EOutputColumnSource::Measure: + *itemsPtr++ = Measures[c.second]->GetValue(ctx); + break; + case EOutputColumnSource::PartitionKey: + *itemsPtr++ = PartitionKey.GetElement(c.second); + break; + } + } + return result; + } + bool ProcessEndOfData(TComputationContext& ctx) override { + for (size_t i = 0; i != Rows.size(); ++i) { + CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(static_cast<ui64>(i))); + for (size_t v = 0; v != Defines.size(); ++v) { + const auto &d = Defines[v]->GetValue(ctx); + if (d && d.GetOptionalValue().Get<bool>()) { + auto &var = MatchedVars[v]; + if (var.empty()) { + var.emplace_back(i, i); + } else if (var.back().second + 1 == i) { + ++var.back().second; + } else { + var.emplace_back(i, i); + } + } + } + } + //Assume match at the end of each partition; + HasMatch = true; + return HasMatch; + } +private: + const NUdf::TUnboxedValue PartitionKey; + IComputationExternalNode* const MatchedVarsArg; + const TVector<IComputationNode*>& Measures; + const TOutputColumnOrder& OutputColumnOrder; + IComputationExternalNode* const CurrentRowIndexArg; + const TVector<IComputationNode*>& Defines; + const TContainerCacheOnContext& Cache; + TMatchedVars MatchedVars; + bool HasMatch; + TVector<NUdf::TUnboxedValue> Rows; +}; + +class TStreamingMatchRecognize: public IProcessMatchRecognize { +public: + TStreamingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, IComputationExternalNode* matchedVarsArg, TVector<IComputationNode*>& measures, @@ -74,8 +155,8 @@ public: } NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { if (!HasMatch) - return NUdf::TUnboxedValue::Invalid(); - MatchedVarsArg->SetValue(ctx, NUdf::TUnboxedValuePod(new TMatchedVarsValue(&ctx.HolderFactory.GetMemInfo(), MatchedVars))); + return NUdf::TUnboxedValue{}; + MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue>(MatchedVars)); HasMatch = false; NUdf::TUnboxedValue *itemsPtr = nullptr; const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr); @@ -87,16 +168,14 @@ public: case EOutputColumnSource::PartitionKey: *itemsPtr++ = PartitionKey.GetElement(c.second); break; - default: - MKQL_ENSURE(false, "Internal logic error"); } } return result; } - bool ProcessEndOfData() override { - //Assume match at the the end of each partition - HasMatch = true; - return HasMatch; + bool ProcessEndOfData(TComputationContext& ctx) override { + Y_UNUSED(ctx); + MKQL_ENSURE(false, "Internal logic error. End of partition is not expected for a stream"); + return false; } private: const NUdf::TUnboxedValue PartitionKey; @@ -154,12 +233,12 @@ public: } auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get()); while (true) { - if (auto output = state.GetOutputIfReady(ctx); !output.IsInvalid()) { + if (auto output = state.GetOutputIfReady(ctx); output) { return output; } auto item = InputFlow->GetValue(ctx); if (item.IsFinish()) { - state.ProcessEndOfData(); + state.ProcessEndOfData(ctx); continue; } else if (item.IsSpecial()) { return item; @@ -204,9 +283,9 @@ private: } } - void ProcessEndOfData() { + void ProcessEndOfData(TComputationContext& ctx) { for (auto it = Partitions.begin(); it != Partitions.end(); ++it) { - auto b = it->second->ProcessEndOfData(); + auto b = it->second->ProcessEndOfData(ctx); if (b) { HasReadyOutput.push(it); } @@ -217,7 +296,7 @@ private: NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { while (!HasReadyOutput.empty()) { auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx); - if (r.IsInvalid()) { + if (not r) { //dried up HasReadyOutput.pop(); continue; @@ -225,7 +304,7 @@ private: return r; } } - return Terminating ? NUdf::TUnboxedValue::MakeFinish() : NUdf::TUnboxedValue::Invalid(); + return Terminating ? NUdf::TUnboxedValue(NUdf::TUnboxedValue::MakeFinish()) : NUdf::TUnboxedValue{}; } private: @@ -236,7 +315,7 @@ private: if (const auto it = Partitions.find(TString(packedKey)); it != Partitions.end()) { return it; } else { - return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TStreamMatchRecognize>( + return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TBackTrackingMatchRecognize>( std::move(partitionKey), MatchedVarsArg, Measures, |