aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-05 16:38:38 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-05 18:02:12 +0300
commitad6ab7f24891543e0729e88860a31ae95d11c468 (patch)
tree4550307c1a288e194f1ac30f918434528dbe5462
parentf5907623ca4bf9c71ba0d162c40f76a44ca88ee8 (diff)
downloadydb-ad6ab7f24891543e0729e88860a31ae95d11c468.tar.gz
YQL-16486 skeleton for back-tracking impl of MATCH_RECOGNIZE
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp115
1 files changed, 97 insertions, 18 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
index 74504eb06f..bc850ac9b6 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -23,13 +23,94 @@ struct IProcessMatchRecognize {
///return true if it has output data ready
virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) = 0;
virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0;
- virtual bool ProcessEndOfData() = 0;
+ virtual bool ProcessEndOfData(TComputationContext& ctx) = 0;
virtual ~IProcessMatchRecognize(){}
};
-class TStreamMatchRecognize: public IProcessMatchRecognize {
+class TBackTrackingMatchRecognize: public IProcessMatchRecognize {
public:
- TStreamMatchRecognize(
+ TBackTrackingMatchRecognize(
+ NUdf::TUnboxedValue&& partitionKey,
+ IComputationExternalNode* matchedVarsArg,
+ TVector<IComputationNode*>& measures,
+ const TOutputColumnOrder& outputColumnOrder,
+ IComputationExternalNode* currentRowIndexArg,
+ const TVector<IComputationNode*>& defines,
+ const TContainerCacheOnContext& cache
+ )
+ : PartitionKey(std::move(partitionKey))
+ , MatchedVarsArg(matchedVarsArg)
+ , Measures(measures)
+ , OutputColumnOrder(outputColumnOrder)
+ , CurrentRowIndexArg(currentRowIndexArg)
+ , Defines(defines)
+ , Cache(cache)
+ , MatchedVars(Defines.size())
+ , HasMatch(false)
+ {
+ }
+
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
+ Y_UNUSED(ctx);
+ Rows.push_back(std::move(row));
+ return false;
+ }
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ if (!HasMatch)
+ return NUdf::TUnboxedValue{};
+ MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue>(MatchedVars));
+ HasMatch = false;
+ NUdf::TUnboxedValue *itemsPtr = nullptr;
+ const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr);
+ for (auto const& c: OutputColumnOrder) {
+ switch(c.first) {
+ case EOutputColumnSource::Measure:
+ *itemsPtr++ = Measures[c.second]->GetValue(ctx);
+ break;
+ case EOutputColumnSource::PartitionKey:
+ *itemsPtr++ = PartitionKey.GetElement(c.second);
+ break;
+ }
+ }
+ return result;
+ }
+ bool ProcessEndOfData(TComputationContext& ctx) override {
+ for (size_t i = 0; i != Rows.size(); ++i) {
+ CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(static_cast<ui64>(i)));
+ for (size_t v = 0; v != Defines.size(); ++v) {
+ const auto &d = Defines[v]->GetValue(ctx);
+ if (d && d.GetOptionalValue().Get<bool>()) {
+ auto &var = MatchedVars[v];
+ if (var.empty()) {
+ var.emplace_back(i, i);
+ } else if (var.back().second + 1 == i) {
+ ++var.back().second;
+ } else {
+ var.emplace_back(i, i);
+ }
+ }
+ }
+ }
+ //Assume match at the end of each partition;
+ HasMatch = true;
+ return HasMatch;
+ }
+private:
+ const NUdf::TUnboxedValue PartitionKey;
+ IComputationExternalNode* const MatchedVarsArg;
+ const TVector<IComputationNode*>& Measures;
+ const TOutputColumnOrder& OutputColumnOrder;
+ IComputationExternalNode* const CurrentRowIndexArg;
+ const TVector<IComputationNode*>& Defines;
+ const TContainerCacheOnContext& Cache;
+ TMatchedVars MatchedVars;
+ bool HasMatch;
+ TVector<NUdf::TUnboxedValue> Rows;
+};
+
+class TStreamingMatchRecognize: public IProcessMatchRecognize {
+public:
+ TStreamingMatchRecognize(
NUdf::TUnboxedValue&& partitionKey,
IComputationExternalNode* matchedVarsArg,
TVector<IComputationNode*>& measures,
@@ -74,8 +155,8 @@ public:
}
NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
if (!HasMatch)
- return NUdf::TUnboxedValue::Invalid();
- MatchedVarsArg->SetValue(ctx, NUdf::TUnboxedValuePod(new TMatchedVarsValue(&ctx.HolderFactory.GetMemInfo(), MatchedVars)));
+ return NUdf::TUnboxedValue{};
+ MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue>(MatchedVars));
HasMatch = false;
NUdf::TUnboxedValue *itemsPtr = nullptr;
const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr);
@@ -87,16 +168,14 @@ public:
case EOutputColumnSource::PartitionKey:
*itemsPtr++ = PartitionKey.GetElement(c.second);
break;
- default:
- MKQL_ENSURE(false, "Internal logic error");
}
}
return result;
}
- bool ProcessEndOfData() override {
- //Assume match at the the end of each partition
- HasMatch = true;
- return HasMatch;
+ bool ProcessEndOfData(TComputationContext& ctx) override {
+ Y_UNUSED(ctx);
+ MKQL_ENSURE(false, "Internal logic error. End of partition is not expected for a stream");
+ return false;
}
private:
const NUdf::TUnboxedValue PartitionKey;
@@ -154,12 +233,12 @@ public:
}
auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get());
while (true) {
- if (auto output = state.GetOutputIfReady(ctx); !output.IsInvalid()) {
+ if (auto output = state.GetOutputIfReady(ctx); output) {
return output;
}
auto item = InputFlow->GetValue(ctx);
if (item.IsFinish()) {
- state.ProcessEndOfData();
+ state.ProcessEndOfData(ctx);
continue;
} else if (item.IsSpecial()) {
return item;
@@ -204,9 +283,9 @@ private:
}
}
- void ProcessEndOfData() {
+ void ProcessEndOfData(TComputationContext& ctx) {
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
- auto b = it->second->ProcessEndOfData();
+ auto b = it->second->ProcessEndOfData(ctx);
if (b) {
HasReadyOutput.push(it);
}
@@ -217,7 +296,7 @@ private:
NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
while (!HasReadyOutput.empty()) {
auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx);
- if (r.IsInvalid()) {
+ if (not r) {
//dried up
HasReadyOutput.pop();
continue;
@@ -225,7 +304,7 @@ private:
return r;
}
}
- return Terminating ? NUdf::TUnboxedValue::MakeFinish() : NUdf::TUnboxedValue::Invalid();
+ return Terminating ? NUdf::TUnboxedValue(NUdf::TUnboxedValue::MakeFinish()) : NUdf::TUnboxedValue{};
}
private:
@@ -236,7 +315,7 @@ private:
if (const auto it = Partitions.find(TString(packedKey)); it != Partitions.end()) {
return it;
} else {
- return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TStreamMatchRecognize>(
+ return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TBackTrackingMatchRecognize>(
std::move(partitionKey),
MatchedVarsArg,
Measures,