aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-14 21:41:58 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-14 22:09:05 +0300
commit179f98b85dbb00b946e8bc3d40dde90e5b9ba8df (patch)
tree8b636ba815d6cf7028fcda80cdf5340aebac573e
parent66ab4dcacadc42a49d252c99d3f7666f13a5abe1 (diff)
downloadydb-179f98b85dbb00b946e8bc3d40dde90e5b9ba8df.tar.gz
YQL-16443 TimeOrderRecover for MATCH_RECOGNIZE on streams
YQL-16443 TimeOrderRecover
-rw-r--r--ydb/library/yql/core/sql_types/time_order_recover.h9
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp87
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.h1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp191
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ya.make.inc1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp36
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h8
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp17
20 files changed, 378 insertions, 1 deletions
diff --git a/ydb/library/yql/core/sql_types/time_order_recover.h b/ydb/library/yql/core/sql_types/time_order_recover.h
new file mode 100644
index 0000000000..fd6ef0b61a
--- /dev/null
+++ b/ydb/library/yql/core/sql_types/time_order_recover.h
@@ -0,0 +1,9 @@
+#pragma once
+#include <string_view>
+
+namespace NYql::NTimeOrderRecover {
+
+using namespace std::string_view_literals;
+constexpr auto OUT_OF_ORDER_MARKER = "_yql_OutOfOrder"sv;
+
+}//namespace NYql::NMatchRecognize
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 522ffa9f19..52527f04fc 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -12120,6 +12120,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["MatchRecognizePattern"] = &MatchRecognizePatternWrapper;
Functions["MatchRecognizeDefines"] = &MatchRecognizeDefinesWrapper;
ExtFunctions["MatchRecognizeCore"] = &MatchRecognizeCoreWrapper;
+ Functions["TimeOrderRecover"] = &TimeOrderRecoverWrapper;
Functions["FromPg"] = &FromPgWrapper;
Functions["ToPg"] = &ToPgWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 51a5fdfbd3..361837ca21 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -2,6 +2,7 @@
#include "type_ann_impl.h"
#include "type_ann_list.h"
+#include <ydb/library/yql/core/sql_types/time_order_recover.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
@@ -7345,5 +7346,91 @@ namespace {
return true;
}
+
+ IGraphTransformer::TStatus TimeOrderRecoverWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ const auto& source = input->ChildRef(0);
+ auto& timeExtractor = input->ChildRef(1);
+ const auto& delay = input->ChildRef(2);
+ const auto& ahead = input->ChildRef(3);
+ const auto& rowLimit = input->ChildRef(4);
+
+ if (!EnsureFlowType(*source, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ const auto& inputRowType = GetSeqItemType(source->GetTypeAnn());
+
+ if (!EnsureStructType(source->Pos(), *inputRowType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ //add artificial boolean column to taint out of order rows in the result table
+ auto outputRowColumns = inputRowType->Cast<TStructExprType>()->GetItems();
+ outputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>(
+ NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER,
+ ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool)
+ ));
+ auto outputRowType = ctx.Expr.MakeType<TStructExprType>(outputRowColumns);
+ if (!outputRowType->Validate(input->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ auto status = ConvertToLambda(timeExtractor, ctx.Expr, 1, 1);
+ if (status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+ { //timeExtractor
+ if (!UpdateLambdaAllArgumentsTypes(timeExtractor, {inputRowType}, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ if (!timeExtractor->GetTypeAnn()) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ bool isOptional;
+ const TDataExprType* type;
+ if (!EnsureDataOrOptionalOfData(*timeExtractor, isOptional, type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ if (type->GetSlot() != EDataSlot::Timestamp) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(timeExtractor->Pos()), TStringBuilder() << "Expected Timestamp, but got: " << type->GetSlot()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ { //delay
+ bool isOptional;
+ const TDataExprType* type;
+ if (!EnsureDataOrOptionalOfData(*delay, isOptional, type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ if (type->GetSlot() != EDataSlot::Interval) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(delay->Pos()), TStringBuilder() << "Expected Interval, but got: " << type->GetSlot()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ { //ahead
+ bool isOptional;
+ const TDataExprType* type;
+ if (!EnsureDataOrOptionalOfData(*ahead, isOptional, type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ if (type->GetSlot() != EDataSlot::Interval) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(ahead->Pos()), TStringBuilder() << "Expected Interval, but got: " << type->GetSlot()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ { //rowLimit
+ bool isOptional;
+ const TDataExprType* type;
+ if (!EnsureDataOrOptionalOfData(*rowLimit, isOptional, type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ if (type->GetSlot() != EDataSlot::Uint32) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(rowLimit->Pos()), TStringBuilder() << "Expected Uint32, but got: " << type->GetSlot()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputRowType));
+ return IGraphTransformer::TStatus::Ok;
+ }
} // namespace NTypeAnnImpl
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.h b/ydb/library/yql/core/type_ann/type_ann_list.h
index 50ac73b554..180626eba6 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.h
+++ b/ydb/library/yql/core/type_ann/type_ann_list.h
@@ -133,5 +133,6 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus IterableWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus SqueezeToListWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus EmptyFromWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus TimeOrderRecoverWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
index 780ea80c0a..7094655211 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
@@ -137,6 +137,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
index f279be85fb..deab4e4e4c 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
@@ -138,6 +138,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
index f279be85fb..deab4e4e4c 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
@@ -138,6 +138,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
index 780ea80c0a..7094655211 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
@@ -137,6 +137,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index edad0bed49..7ffbfe8f6f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -90,6 +90,7 @@
#include "mkql_squeeze_to_list.h"
#include "mkql_switch.h"
#include "mkql_take.h"
+#include "mkql_time_order_recover.h"
#include "mkql_timezone.h"
#include "mkql_tobytes.h"
#include "mkql_todict.h"
@@ -345,6 +346,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"NextValue", &WrapNextValue},
{"Nop", &WrapNop},
{"MatchRecognizeCore", &WrapMatchRecognizeCore},
+ {"TimeOrderRecover", WrapComputationBuilder(TimeOrderRecover)}
};
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
new file mode 100644
index 0000000000..e6af05d8f9
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
@@ -0,0 +1,191 @@
+#include "mkql_time_order_recover.h"
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <queue>
+
+namespace NKikimr::NMiniKQL {
+
+namespace {
+
+class TState: public TComputationValue<TState> {
+public:
+ using TTimestamp = i64; //use signed integers to simplify arithmetics
+ using TTimeinterval = i64;
+
+ TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit)
+ : TComputationValue<TState>(memInfo)
+ , Heap(Greater)
+ , Delay(delay)
+ , Ahead(ahead)
+ , RowLimit(rowLimit + 1)
+ , Latest(0)
+ , Terminating(false)
+ {}
+ NUdf::TUnboxedValue GetOutputIfReady() {
+ if (Terminating && Heap.empty()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (Heap.empty()) {
+ return NUdf::TUnboxedValue{};
+ }
+ TTimestamp oldest = Heap.top().first;
+ if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) {
+ auto result = std::move(Heap.top().second);
+ Heap.pop();
+ return result;
+ }
+ return NUdf::TUnboxedValue{};
+ }
+ ///return input row in case it cannot process it correctly
+ NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) {
+ MKQL_ENSURE(!row.IsSpecial(), "Internal logic error");
+ MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error");
+ if (Heap.empty()) {
+ Latest = t;
+ }
+ if (Latest + Delay < t && t < Latest + Ahead) {
+ Heap.emplace(t, std::move(row));
+ } else {
+ return row;
+ }
+ Latest = std::max(Latest, t);
+ return NUdf::TUnboxedValue{};
+ }
+ void Finish() {
+ Terminating = true;
+ }
+private:
+ using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
+ static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
+ return lhs.first > rhs.first;
+ };
+ using THeap = std::priority_queue<
+ TEntry,
+ std::vector<TEntry, TMKQLAllocator<TEntry>>,
+ decltype(Greater)>;
+ THeap Heap;
+ const TTimeinterval Delay;
+ const TTimeinterval Ahead;
+ const ui32 RowLimit;
+ TTimestamp Latest;
+ bool Terminating; //not applicable for streams, but useful for debug and testing
+};
+
+class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> {
+ using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>;
+public:
+ TTimeOrderRecover(
+ TComputationMutables& mutables,
+ EValueRepresentation kind,
+ IComputationNode* inputFlow,
+ IComputationExternalNode* inputRowArg,
+ IComputationNode* rowTime,
+ ui32 inputRowColumnCount,
+ ui32 outOfOrderColumnIndex,
+ IComputationNode* delay,
+ IComputationNode* ahead,
+ IComputationNode* rowLimit
+ )
+ : TBaseComputation(mutables, inputFlow, kind)
+ , InputFlow(inputFlow)
+ , InputRowArg(inputRowArg)
+ , RowTime(rowTime)
+ , InputRowColumnCount(inputRowColumnCount)
+ , OutOfOrderColumnIndex(outOfOrderColumnIndex)
+ , Delay(delay)
+ , Ahead(ahead)
+ , RowLimit(rowLimit)
+ , Cache(mutables)
+ {}
+
+ NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const {
+ if (stateValue.IsInvalid()) {
+ stateValue = ctx.HolderFactory.Create<TState>(
+ Delay->GetValue(ctx).Get<i64>(),
+ Ahead->GetValue(ctx).Get<i64>(),
+ RowLimit->GetValue(ctx).Get<ui32>()
+ );
+ }
+ auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get());
+ while (true) {
+ if (auto out = state.GetOutputIfReady()) {
+ return AddColumn(std::move(out), false, ctx);
+ }
+ auto item = InputFlow->GetValue(ctx);
+ if (item.IsSpecial()) {
+ if (item.IsFinish()) {
+ state.Finish();
+ } else {
+ return item;
+ }
+ } else {
+ InputRowArg->SetValue(ctx, NUdf::TUnboxedValue{item});
+ const auto t = RowTime->GetValue(ctx).Get<ui64>();
+ if (auto row = state.ProcessRow(static_cast<TState::TTimestamp>(t), std::move(item))) {
+ return AddColumn(std::move(row), true, ctx);
+ }
+ }
+ }
+ }
+private:
+ void RegisterDependencies() const final {
+ if (const auto flow = FlowDependsOn(InputFlow)) {
+ Own(flow, InputRowArg);
+ DependsOn(flow, RowTime);
+ }
+ }
+
+ NUdf::TUnboxedValue AddColumn(NUdf::TUnboxedValue&& row, bool outOfOrder, TComputationContext& ctx) const {
+ if (row.IsSpecial()) {
+ return row;
+ }
+ NUdf::TUnboxedValue* itemsPtr = nullptr;
+ auto result = Cache.NewArray(ctx, InputRowColumnCount + 1, itemsPtr);
+ ui32 inputColumnIndex = 0;
+ for (ui32 i = 0; i != InputRowColumnCount + 1; ++i) {
+ if (OutOfOrderColumnIndex == i) {
+ *itemsPtr++ = NUdf::TUnboxedValuePod{outOfOrder};
+ } else {
+ *itemsPtr++ = std::move(row.GetElements()[inputColumnIndex++]);
+ }
+ }
+ return result;
+ }
+
+ IComputationNode* const InputFlow;
+ IComputationExternalNode* const InputRowArg;
+ IComputationNode* const RowTime;
+ const ui32 InputRowColumnCount;
+ const ui32 OutOfOrderColumnIndex;
+ const IComputationNode* Delay;
+ const IComputationNode* Ahead;
+ const IComputationNode* RowLimit;
+ const TContainerCacheOnContext Cache;
+};
+
+} //namespace
+
+IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx,
+ TRuntimeNode inputFlow,
+ TRuntimeNode inputRowArg,
+ TRuntimeNode rowTime,
+ TRuntimeNode inputRowColumnCount,
+ TRuntimeNode outOfOrderColumnIndex,
+ TRuntimeNode delay,
+ TRuntimeNode ahead,
+ TRuntimeNode rowLimit)
+{
+ return new TTimeOrderRecover(ctx.Mutables
+ , GetValueRepresentation(inputFlow.GetStaticType())
+ , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
+ , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
+ , LocateNode(ctx.NodeLocator, *rowTime.GetNode())
+ , AS_VALUE(TDataLiteral, inputRowColumnCount)->AsValue().Get<ui32>()
+ , AS_VALUE(TDataLiteral, outOfOrderColumnIndex)->AsValue().Get<ui32>()
+ , LocateNode(ctx.NodeLocator, *delay.GetNode())
+ , LocateNode(ctx.NodeLocator, *ahead.GetNode())
+ , LocateNode(ctx.NodeLocator, *rowLimit.GetNode())
+ );
+}
+
+}//namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h
new file mode 100644
index 0000000000..84beea6c17
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h
@@ -0,0 +1,16 @@
+#pragma once
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+namespace NKikimr::NMiniKQL {
+
+IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx,
+ TRuntimeNode inputFlow,
+ TRuntimeNode inputRowArg,
+ TRuntimeNode rowTime,
+ TRuntimeNode inputRowColumnCount,
+ TRuntimeNode outOfOrderColumnIndex,
+ TRuntimeNode delay,
+ TRuntimeNode ahead,
+ TRuntimeNode rowLimit);
+
+} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
index e40064d4a6..b157708bfb 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
@@ -133,6 +133,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
index 01c73e57af..00214e6906 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
@@ -134,6 +134,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
index 01c73e57af..00214e6906 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
@@ -134,6 +134,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
index e40064d4a6..b157708bfb 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
@@ -133,6 +133,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
index d4315842e7..fcfa9104cc 100644
--- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc
+++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
@@ -104,6 +104,7 @@ SRCS(
mkql_condense1.cpp
mkql_switch.cpp
mkql_take.cpp
+ mkql_time_order_recover.cpp
mkql_timezone.cpp
mkql_tobytes.cpp
mkql_todict.cpp
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 0248d6f928..054edd62c4 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -8,6 +8,7 @@
#include "ydb/library/yql/minikql/mkql_utils.h"
#include "ydb/library/yql/minikql/mkql_type_builder.h"
#include "ydb/library/yql/core/sql_types/match_recognize.h"
+#include "ydb/library/yql/core/sql_types/time_order_recover.h"
#include <util/string/cast.h>
#include <util/string/printf.h>
@@ -6004,6 +6005,41 @@ TRuntimeNode TProgramBuilder::MatchRecognizeCore(
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::TimeOrderRecover(
+ TRuntimeNode inputStream,
+ const TUnaryLambda& getTimeExtractor,
+ TRuntimeNode delay,
+ TRuntimeNode ahead,
+ TRuntimeNode rowLimit
+ )
+{
+ MKQL_ENSURE(RuntimeVersion >= 44, "TimeOrderRecover is not supported in runtime version " << RuntimeVersion);
+
+ auto& inputRowType = *static_cast<TStructType*>(AS_TYPE(TStructType, AS_TYPE(TFlowType, inputStream.GetStaticType())->GetItemType()));
+ const auto inputRowArg = Arg(&inputRowType);
+ TStructTypeBuilder outputRowTypeBuilder(Env);
+ outputRowTypeBuilder.Reserve(inputRowType.GetMembersCount() + 1);
+ const ui32 inputRowColumnCount = inputRowType.GetMembersCount();
+ for (ui32 i = 0; i != inputRowColumnCount; ++i) {
+ outputRowTypeBuilder.Add(inputRowType.GetMemberName(i), inputRowType.GetMemberType(i));
+ }
+ using NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER;
+ outputRowTypeBuilder.Add(OUT_OF_ORDER_MARKER, TDataType::Create(NUdf::TDataType<bool>::Id, Env));
+ const auto outputRowType = outputRowTypeBuilder.Build();
+ const auto outOfOrderColumnIndex = outputRowType->GetMemberIndex(OUT_OF_ORDER_MARKER);
+ TCallableBuilder callableBuilder(GetTypeEnvironment(), "TimeOrderRecover", TFlowType::Create(outputRowType, Env));
+
+ callableBuilder.Add(inputStream);
+ callableBuilder.Add(inputRowArg);
+ callableBuilder.Add(getTimeExtractor(inputRowArg));
+ callableBuilder.Add(NewDataLiteral(inputRowColumnCount));
+ callableBuilder.Add(NewDataLiteral(outOfOrderColumnIndex));
+ callableBuilder.Add(delay),
+ callableBuilder.Add(ahead),
+ callableBuilder.Add(rowLimit);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
bool CanExportType(TType* type, const TTypeEnvironment& env) {
if (type->GetKind() == TType::EKind::Type) {
return false; // Type of Type
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index a2b9ae49ae..4be297d5ef 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -707,6 +707,14 @@ public:
bool streamingMode
);
+ TRuntimeNode TimeOrderRecover(
+ TRuntimeNode inputStream,
+ const TUnaryLambda& getTimeExtractor,
+ TRuntimeNode delay,
+ TRuntimeNode ahead,
+ TRuntimeNode rowLimit
+ );
+
protected:
TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode IfPresent(TRuntimeNode optional, const TUnaryLambda& thenBranch, TRuntimeNode elseBranch);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index 751e8695f5..29eb20a452 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 43U
+#define MKQL_RUNTIME_VERSION 44U
#endif
// History:
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 11d5c6842c..85ea5fbd31 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -919,6 +919,23 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
);
});
+ AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ const auto inputStream = node.Child(0);
+ const auto timeExtractor = node.Child(1);
+ const auto delay = node.Child(2);
+ const auto ahead = node.Child(3);
+ const auto rowLimit = node.Child(4);
+ return ctx.ProgramBuilder.TimeOrderRecover(
+ MkqlBuildExpr(*inputStream, ctx),
+ [timeExtractor, &ctx](TRuntimeNode row) {
+ return MkqlBuildLambda(*timeExtractor, ctx, {row});
+ },
+ MkqlBuildExpr(*delay, ctx),
+ MkqlBuildExpr(*ahead, ctx),
+ MkqlBuildExpr(*rowLimit, ctx)
+ );
+ });
+
AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
auto type = node.Head().GetTypeAnn();