aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-18 12:42:19 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-18 13:06:02 +0300
commit1a51d3f4e6a171eb78d2a4b41719a4a07541e3e3 (patch)
tree92211b10578e2c72ed815c3e122c89809681cf3e
parentb1528b323be234edf18604f475eaae81d733d66e (diff)
downloadydb-1a51d3f4e6a171eb78d2a4b41719a4a07541e3e3.tar.gz
YQL-16325 move termination flag to the state
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp13
1 files changed, 8 insertions, 5 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
index e45190369a9..244d23c37bc 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -199,6 +199,7 @@ public:
, PartitionKeyPacker(true, partitionKeyType)
, Parameters(parameters)
, Cache(cache)
+ , Terminating(false)
{}
bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
MKQL_ENSURE(not DelayedRow, "Internal logic error"); //we're finalizing previous partition
@@ -219,6 +220,9 @@ public:
return false;
}
bool ProcessEndOfData(TComputationContext& ctx) override {
+ if (Terminating)
+ return false;
+ Terminating = true;
if (PartitionHandler) {
return PartitionHandler->ProcessEndOfData(ctx);
}
@@ -247,6 +251,9 @@ public:
));
PartitionHandler->ProcessInputRow(std::move(temp), ctx);
}
+ if (Terminating) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
return NUdf::TUnboxedValue{};
}
private:
@@ -258,6 +265,7 @@ private:
const TMatchRecognizeProcessorParameters& Parameters;
const TContainerCacheOnContext& Cache;
NUdf::TUnboxedValue DelayedRow;
+ bool Terminating;
};
class TStateForInterleavedPartitions
@@ -375,17 +383,12 @@ public:
);
}
auto state = static_cast<State*>(stateValue.AsBoxed().Get());
- bool terminating = false;
while (true) {
if (auto output = state->GetOutputIfReady(ctx); output) {
return output;
}
- if (terminating) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
auto item = InputFlow->GetValue(ctx);
if (item.IsFinish()) {
- terminating = true;
state->ProcessEndOfData(ctx);
continue;
} else if (item.IsSpecial()) {