aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-20 20:42:13 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-20 21:03:47 +0300
commit75a840cd0de3c92a4c7b502f3d5961a0cf2e91e6 (patch)
tree3e393d36b0857fddac40594bb1a17ddd9998ec2f
parenteefa8d614336d128f415247429bacafbf2e9c403 (diff)
downloadydb-75a840cd0de3c92a4c7b502f3d5961a0cf2e91e6.tar.gz
YQL-16443 common tests for streaming mode
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp5
1 files changed, 2 insertions, 3 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
index dd82f813028..9c5a74d668d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -142,7 +142,7 @@ public:
}
bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override{
- Y_UNUSED(row);
+ Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TListValue<TSparseList>>(Rows));
Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(Rows.Size()));
auto r = Rows.Append(std::move(row));
for (size_t i = 0; i != Parameters.Defines.size(); ++i) {
@@ -174,7 +174,6 @@ public:
}
bool ProcessEndOfData(TComputationContext& ctx) override {
Y_UNUSED(ctx);
- MKQL_ENSURE(false, "Internal logic error. End of partition is not expected for a stream");
return false;
}
private:
@@ -573,7 +572,7 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
, GetOutputColumnOrder(partitionColumnIndexes, measureColumnIndexes)
};
if (AS_VALUE(TDataLiteral, streamingMode)->AsValue().Get<bool>()) {
- return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions>(ctx.Mutables
+ return new TMatchRecognizeWrapper<TStateForInterleavedPartitions>(ctx.Mutables
, GetValueRepresentation(inputFlow.GetStaticType())
, LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
, static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))