diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-18 12:42:19 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-18 13:06:02 +0300 |
commit | 1a51d3f4e6a171eb78d2a4b41719a4a07541e3e3 (patch) | |
tree | 92211b10578e2c72ed815c3e122c89809681cf3e | |
parent | b1528b323be234edf18604f475eaae81d733d66e (diff) | |
download | ydb-1a51d3f4e6a171eb78d2a4b41719a4a07541e3e3.tar.gz |
YQL-16325 move termination flag to the state
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp index e45190369a9..244d23c37bc 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -199,6 +199,7 @@ public: , PartitionKeyPacker(true, partitionKeyType) , Parameters(parameters) , Cache(cache) + , Terminating(false) {} bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { MKQL_ENSURE(not DelayedRow, "Internal logic error"); //we're finalizing previous partition @@ -219,6 +220,9 @@ public: return false; } bool ProcessEndOfData(TComputationContext& ctx) override { + if (Terminating) + return false; + Terminating = true; if (PartitionHandler) { return PartitionHandler->ProcessEndOfData(ctx); } @@ -247,6 +251,9 @@ public: )); PartitionHandler->ProcessInputRow(std::move(temp), ctx); } + if (Terminating) { + return NUdf::TUnboxedValue::MakeFinish(); + } return NUdf::TUnboxedValue{}; } private: @@ -258,6 +265,7 @@ private: const TMatchRecognizeProcessorParameters& Parameters; const TContainerCacheOnContext& Cache; NUdf::TUnboxedValue DelayedRow; + bool Terminating; }; class TStateForInterleavedPartitions @@ -375,17 +383,12 @@ public: ); } auto state = static_cast<State*>(stateValue.AsBoxed().Get()); - bool terminating = false; while (true) { if (auto output = state->GetOutputIfReady(ctx); output) { return output; } - if (terminating) { - return NUdf::TUnboxedValue::MakeFinish(); - } auto item = InputFlow->GetValue(ctx); if (item.IsFinish()) { - terminating = true; state->ProcessEndOfData(ctx); continue; } else if (item.IsSpecial()) { |