aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-26 12:33:14 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-26 13:37:24 +0300
commitdf1df29214d1b77e33a217256d8e11ae779ded4f (patch)
treeea1ccff506051e4beedd3396c9cb910154fd167f
parentc083da78d46e31819d8e04e6624a25409a5f99d0 (diff)
downloadydb-df1df29214d1b77e33a217256d8e11ae779ded4f.tar.gz
YQL-16443 replace dynamic polymorphism with static, use nfa for tables
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp94
1 files changed, 57 insertions, 37 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
index ed3c8327f8a..1a19d714d9b 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -21,15 +21,6 @@ using TOutputColumnOrder = TVector<std::pair<EOutputColumnSource, size_t>>;
using namespace NYql::NMatchRecognize;
-//Process input rows (one or many partitions)
-struct IProcessMatchRecognize {
- ///return true if it has output data ready
- virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) = 0;
- virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0;
- virtual bool ProcessEndOfData(TComputationContext& ctx) = 0;
- virtual ~IProcessMatchRecognize(){}
-};
-
struct TMatchRecognizeProcessorParameters {
IComputationExternalNode* InputDataArg;
TRowPattern Pattern;
@@ -44,14 +35,25 @@ struct TMatchRecognizeProcessorParameters {
TOutputColumnOrder OutputColumnOrder;
};
-class TBackTrackingMatchRecognize: public IProcessMatchRecognize {
+class TBackTrackingMatchRecognize {
using TPartitionList = TSimpleList;
using TRange = TPartitionList::TRange;
using TMatchedVars = TMatchedVars<TRange>;
public:
+ //TODO(YQL-16486): create a tree for backtracking(replace var names with indexes)
+ struct TPatternConfiguration {
+ using TPtr = std::shared_ptr<TPatternConfiguration>;
+ static TPtr Create(const TRowPattern& pattern, const THashMap<TString, size_t>& varNameToIndex) {
+ Y_UNUSED(pattern);
+ Y_UNUSED(varNameToIndex);
+ return std::make_shared<TPatternConfiguration>();
+ }
+ };
+
TBackTrackingMatchRecognize(
NUdf::TUnboxedValue&& partitionKey,
const TMatchRecognizeProcessorParameters& parameters,
+ const TPatternConfiguration::TPtr pattern,
const TContainerCacheOnContext& cache
)
: PartitionKey(std::move(partitionKey))
@@ -60,14 +62,16 @@ public:
, CurMatchedVars(parameters.Defines.size())
, MatchNumber(0)
{
+ //TODO(YQL-16486)
+ Y_UNUSED(pattern);
}
- bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
Y_UNUSED(ctx);
Rows.Append(std::move(row));
return false;
}
- NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
if (Matches.empty())
return NUdf::TUnboxedValue{};
Parameters.MatchedVarsArg->SetValue(ctx, ToValue(ctx.HolderFactory, std::move(Matches.front())));
@@ -93,7 +97,7 @@ public:
}
return result;
}
- bool ProcessEndOfData(TComputationContext& ctx) override {
+ bool ProcessEndOfData(TComputationContext& ctx) {
//Assume, that data moved to IComputationExternalNode node, will not be modified or released
//till the end of the current function
auto rowsSize = Rows.Size();
@@ -126,11 +130,12 @@ private:
ui64 MatchNumber;
};
-class TStreamingMatchRecognize: public IProcessMatchRecognize {
+class TStreamingMatchRecognize {
using TPartitionList = TSparseList;
using TRange = TPartitionList::TRange;
using TMatchedVars = TMatchedVars<TRange>;
public:
+ using TPatternConfiguration = TNfaTransitionGraph;
TStreamingMatchRecognize(
NUdf::TUnboxedValue&& partitionKey,
const TMatchRecognizeProcessorParameters& parameters,
@@ -144,13 +149,13 @@ public:
{
}
- bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override{
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TListValue<TSparseList>>(Rows));
Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(Rows.Size()));
Nfa.ProcessRow(Rows.Append(std::move(row)), ctx);
return Nfa.HasMatched();
}
- NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
auto match = Nfa.GetMatched();
if (!match.has_value())
return NUdf::TUnboxedValue{};
@@ -176,7 +181,7 @@ public:
}
return result;
}
- bool ProcessEndOfData(TComputationContext& ctx) override {
+ bool ProcessEndOfData(TComputationContext& ctx) {
Y_UNUSED(ctx);
return false;
}
@@ -189,10 +194,11 @@ private:
ui64 MatchNumber = 0;
};
+template <typename Algo>
class TStateForNonInterleavedPartitions
- : public TComputationValue<TStateForNonInterleavedPartitions>
- , public IProcessMatchRecognize
+ : public TComputationValue<TStateForNonInterleavedPartitions<Algo>>
{
+ using TRowPatternConfiguration = typename Algo::TPatternConfiguration;
public:
TStateForNonInterleavedPartitions(
TMemoryUsageInfo* memInfo,
@@ -207,10 +213,11 @@ public:
, PartitionKey(partitionKey)
, PartitionKeyPacker(true, partitionKeyType)
, Parameters(parameters)
+ , RowPatternConfiguration(TRowPatternConfiguration::Create(parameters.Pattern, parameters.VarNamesLookup))
, Cache(cache)
, Terminating(false)
{}
- bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
MKQL_ENSURE(not DelayedRow, "Internal logic error"); //we're finalizing previous partition
InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(row));
auto partitionKey = PartitionKey->GetValue(ctx);
@@ -228,7 +235,7 @@ public:
//be aware that the very first partition is created in the same manner as subsequent
return false;
}
- bool ProcessEndOfData(TComputationContext& ctx) override {
+ bool ProcessEndOfData(TComputationContext& ctx) {
if (Terminating)
return false;
Terminating = true;
@@ -238,7 +245,7 @@ public:
return false;
}
- NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
if (PartitionHandler) {
auto result = PartitionHandler->GetOutputIfReady(ctx);
if (result) {
@@ -253,9 +260,10 @@ public:
InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(temp));
auto partitionKey = PartitionKey->GetValue(ctx);
CurPartitionPackedKey = PartitionKeyPacker.Pack(partitionKey);
- PartitionHandler.reset(new TBackTrackingMatchRecognize(
+ PartitionHandler.reset(new Algo(
std::move(partitionKey),
Parameters,
+ RowPatternConfiguration,
Cache
));
PartitionHandler->ProcessInputRow(std::move(temp), ctx);
@@ -267,11 +275,12 @@ public:
}
private:
TString CurPartitionPackedKey;
- std::unique_ptr<IProcessMatchRecognize> PartitionHandler;
+ std::unique_ptr<Algo> PartitionHandler;
IComputationExternalNode* InputRowArg;
IComputationNode* PartitionKey;
TValuePackerGeneric<false> PartitionKeyPacker;
const TMatchRecognizeProcessorParameters& Parameters;
+ const typename TRowPatternConfiguration::TPtr RowPatternConfiguration;
const TContainerCacheOnContext& Cache;
NUdf::TUnboxedValue DelayedRow;
bool Terminating;
@@ -279,9 +288,8 @@ private:
class TStateForInterleavedPartitions
: public TComputationValue<TStateForInterleavedPartitions>
- , public IProcessMatchRecognize
{
- using TPartitionMap = std::unordered_map<TString, std::unique_ptr<IProcessMatchRecognize>>;
+ using TPartitionMap = std::unordered_map<TString, std::unique_ptr<TStreamingMatchRecognize>>;
public:
TStateForInterleavedPartitions(
TMemoryUsageInfo* memInfo,
@@ -300,7 +308,7 @@ public:
, Cache(cache)
{
}
- bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
auto partition = GetPartitionHandler(row, ctx);
if (partition->second->ProcessInputRow(std::move(row), ctx)) {
HasReadyOutput.push(partition);
@@ -308,7 +316,7 @@ public:
return !HasReadyOutput.empty();
}
- bool ProcessEndOfData(TComputationContext& ctx) override {
+ bool ProcessEndOfData(TComputationContext& ctx) {
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
auto b = it->second->ProcessEndOfData(ctx);
if (b) {
@@ -319,7 +327,7 @@ public:
return !HasReadyOutput.empty();
}
- NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
while (!HasReadyOutput.empty()) {
auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx);
if (not r) {
@@ -591,14 +599,26 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
, std::move(parameters)
);
} else {
- return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions>(ctx.Mutables
- , GetValueRepresentation(inputFlow.GetStaticType())
- , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
- , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
- , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode())
- , partitionKeySelector.GetStaticType()
- , std::move(parameters)
- );
+ const bool useNfaForTables = true; //TODO(YQL-16486) get this flag from an optimizer
+ if (useNfaForTables) {
+ return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TStreamingMatchRecognize>>(ctx.Mutables
+ , GetValueRepresentation(inputFlow.GetStaticType())
+ , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
+ , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
+ , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode())
+ , partitionKeySelector.GetStaticType()
+ , std::move(parameters)
+ );
+ } else {
+ return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TBackTrackingMatchRecognize>>(ctx.Mutables
+ , GetValueRepresentation(inputFlow.GetStaticType())
+ , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
+ , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
+ , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode())
+ , partitionKeySelector.GetStaticType()
+ , std::move(parameters)
+ );
+ }
}
}