diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-14 21:41:58 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-14 22:09:05 +0300 |
commit | 179f98b85dbb00b946e8bc3d40dde90e5b9ba8df (patch) | |
tree | 8b636ba815d6cf7028fcda80cdf5340aebac573e | |
parent | 66ab4dcacadc42a49d252c99d3f7666f13a5abe1 (diff) | |
download | ydb-179f98b85dbb00b946e8bc3d40dde90e5b9ba8df.tar.gz |
YQL-16443 TimeOrderRecover for MATCH_RECOGNIZE on streams
YQL-16443 TimeOrderRecover
20 files changed, 378 insertions, 1 deletions
diff --git a/ydb/library/yql/core/sql_types/time_order_recover.h b/ydb/library/yql/core/sql_types/time_order_recover.h new file mode 100644 index 0000000000..fd6ef0b61a --- /dev/null +++ b/ydb/library/yql/core/sql_types/time_order_recover.h @@ -0,0 +1,9 @@ +#pragma once +#include <string_view> + +namespace NYql::NTimeOrderRecover { + +using namespace std::string_view_literals; +constexpr auto OUT_OF_ORDER_MARKER = "_yql_OutOfOrder"sv; + +}//namespace NYql::NMatchRecognize diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 522ffa9f19..52527f04fc 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -12120,6 +12120,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["MatchRecognizePattern"] = &MatchRecognizePatternWrapper; Functions["MatchRecognizeDefines"] = &MatchRecognizeDefinesWrapper; ExtFunctions["MatchRecognizeCore"] = &MatchRecognizeCoreWrapper; + Functions["TimeOrderRecover"] = &TimeOrderRecoverWrapper; Functions["FromPg"] = &FromPgWrapper; Functions["ToPg"] = &ToPgWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 51a5fdfbd3..361837ca21 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -2,6 +2,7 @@ #include "type_ann_impl.h" #include "type_ann_list.h" +#include <ydb/library/yql/core/sql_types/time_order_recover.h> #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> @@ -7345,5 +7346,91 @@ namespace { return true; } + + IGraphTransformer::TStatus TimeOrderRecoverWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + const auto& source = input->ChildRef(0); + auto& timeExtractor = input->ChildRef(1); + const auto& delay = input->ChildRef(2); + const auto& ahead = input->ChildRef(3); + const auto& rowLimit = input->ChildRef(4); + + if (!EnsureFlowType(*source, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + const auto& inputRowType = GetSeqItemType(source->GetTypeAnn()); + + if (!EnsureStructType(source->Pos(), *inputRowType, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + //add artificial boolean column to taint out of order rows in the result table + auto outputRowColumns = inputRowType->Cast<TStructExprType>()->GetItems(); + outputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>( + NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER, + ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool) + )); + auto outputRowType = ctx.Expr.MakeType<TStructExprType>(outputRowColumns); + if (!outputRowType->Validate(input->Pos(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + auto status = ConvertToLambda(timeExtractor, ctx.Expr, 1, 1); + if (status.Level != IGraphTransformer::TStatus::Ok) { + return status; + } + { //timeExtractor + if (!UpdateLambdaAllArgumentsTypes(timeExtractor, {inputRowType}, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (!timeExtractor->GetTypeAnn()) { + return IGraphTransformer::TStatus::Repeat; + } + bool isOptional; + const TDataExprType* type; + if (!EnsureDataOrOptionalOfData(*timeExtractor, isOptional, type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (type->GetSlot() != EDataSlot::Timestamp) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(timeExtractor->Pos()), TStringBuilder() << "Expected Timestamp, but got: " << type->GetSlot())); + return IGraphTransformer::TStatus::Error; + } + } + { //delay + bool isOptional; + const TDataExprType* type; + if (!EnsureDataOrOptionalOfData(*delay, isOptional, type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (type->GetSlot() != EDataSlot::Interval) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(delay->Pos()), TStringBuilder() << "Expected Interval, but got: " << type->GetSlot())); + return IGraphTransformer::TStatus::Error; + } + } + { //ahead + bool isOptional; + const TDataExprType* type; + if (!EnsureDataOrOptionalOfData(*ahead, isOptional, type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (type->GetSlot() != EDataSlot::Interval) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(ahead->Pos()), TStringBuilder() << "Expected Interval, but got: " << type->GetSlot())); + return IGraphTransformer::TStatus::Error; + } + } + { //rowLimit + bool isOptional; + const TDataExprType* type; + if (!EnsureDataOrOptionalOfData(*rowLimit, isOptional, type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (type->GetSlot() != EDataSlot::Uint32) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(rowLimit->Pos()), TStringBuilder() << "Expected Uint32, but got: " << type->GetSlot())); + return IGraphTransformer::TStatus::Error; + } + } + + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputRowType)); + return IGraphTransformer::TStatus::Ok; + } } // namespace NTypeAnnImpl } diff --git a/ydb/library/yql/core/type_ann/type_ann_list.h b/ydb/library/yql/core/type_ann/type_ann_list.h index 50ac73b554..180626eba6 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.h +++ b/ydb/library/yql/core/type_ann/type_ann_list.h @@ -133,5 +133,6 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus IterableWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus SqueezeToListWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus EmptyFromWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus TimeOrderRecoverWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); } // namespace NTypeAnnImpl } // namespace NYql diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt index 780ea80c0a..7094655211 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt @@ -137,6 +137,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt index f279be85fb..deab4e4e4c 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt @@ -138,6 +138,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt index f279be85fb..deab4e4e4c 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt @@ -138,6 +138,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt index 780ea80c0a..7094655211 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt @@ -137,6 +137,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index edad0bed49..7ffbfe8f6f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -90,6 +90,7 @@ #include "mkql_squeeze_to_list.h" #include "mkql_switch.h" #include "mkql_take.h" +#include "mkql_time_order_recover.h" #include "mkql_timezone.h" #include "mkql_tobytes.h" #include "mkql_todict.h" @@ -345,6 +346,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"NextValue", &WrapNextValue}, {"Nop", &WrapNop}, {"MatchRecognizeCore", &WrapMatchRecognizeCore}, + {"TimeOrderRecover", WrapComputationBuilder(TimeOrderRecover)} }; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp new file mode 100644 index 0000000000..e6af05d8f9 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp @@ -0,0 +1,191 @@ +#include "mkql_time_order_recover.h" +#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <queue> + +namespace NKikimr::NMiniKQL { + +namespace { + +class TState: public TComputationValue<TState> { +public: + using TTimestamp = i64; //use signed integers to simplify arithmetics + using TTimeinterval = i64; + + TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit) + : TComputationValue<TState>(memInfo) + , Heap(Greater) + , Delay(delay) + , Ahead(ahead) + , RowLimit(rowLimit + 1) + , Latest(0) + , Terminating(false) + {} + NUdf::TUnboxedValue GetOutputIfReady() { + if (Terminating && Heap.empty()) { + return NUdf::TUnboxedValue::MakeFinish(); + } + if (Heap.empty()) { + return NUdf::TUnboxedValue{}; + } + TTimestamp oldest = Heap.top().first; + if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) { + auto result = std::move(Heap.top().second); + Heap.pop(); + return result; + } + return NUdf::TUnboxedValue{}; + } + ///return input row in case it cannot process it correctly + NUdf::TUnboxedValue ProcessRow(TTimestamp t, NUdf::TUnboxedValue&& row) { + MKQL_ENSURE(!row.IsSpecial(), "Internal logic error"); + MKQL_ENSURE(Heap.size() < RowLimit, "Internal logic error"); + if (Heap.empty()) { + Latest = t; + } + if (Latest + Delay < t && t < Latest + Ahead) { + Heap.emplace(t, std::move(row)); + } else { + return row; + } + Latest = std::max(Latest, t); + return NUdf::TUnboxedValue{}; + } + void Finish() { + Terminating = true; + } +private: + using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; + static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { + return lhs.first > rhs.first; + }; + using THeap = std::priority_queue< + TEntry, + std::vector<TEntry, TMKQLAllocator<TEntry>>, + decltype(Greater)>; + THeap Heap; + const TTimeinterval Delay; + const TTimeinterval Ahead; + const ui32 RowLimit; + TTimestamp Latest; + bool Terminating; //not applicable for streams, but useful for debug and testing +}; + +class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> { + using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>; +public: + TTimeOrderRecover( + TComputationMutables& mutables, + EValueRepresentation kind, + IComputationNode* inputFlow, + IComputationExternalNode* inputRowArg, + IComputationNode* rowTime, + ui32 inputRowColumnCount, + ui32 outOfOrderColumnIndex, + IComputationNode* delay, + IComputationNode* ahead, + IComputationNode* rowLimit + ) + : TBaseComputation(mutables, inputFlow, kind) + , InputFlow(inputFlow) + , InputRowArg(inputRowArg) + , RowTime(rowTime) + , InputRowColumnCount(inputRowColumnCount) + , OutOfOrderColumnIndex(outOfOrderColumnIndex) + , Delay(delay) + , Ahead(ahead) + , RowLimit(rowLimit) + , Cache(mutables) + {} + + NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const { + if (stateValue.IsInvalid()) { + stateValue = ctx.HolderFactory.Create<TState>( + Delay->GetValue(ctx).Get<i64>(), + Ahead->GetValue(ctx).Get<i64>(), + RowLimit->GetValue(ctx).Get<ui32>() + ); + } + auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get()); + while (true) { + if (auto out = state.GetOutputIfReady()) { + return AddColumn(std::move(out), false, ctx); + } + auto item = InputFlow->GetValue(ctx); + if (item.IsSpecial()) { + if (item.IsFinish()) { + state.Finish(); + } else { + return item; + } + } else { + InputRowArg->SetValue(ctx, NUdf::TUnboxedValue{item}); + const auto t = RowTime->GetValue(ctx).Get<ui64>(); + if (auto row = state.ProcessRow(static_cast<TState::TTimestamp>(t), std::move(item))) { + return AddColumn(std::move(row), true, ctx); + } + } + } + } +private: + void RegisterDependencies() const final { + if (const auto flow = FlowDependsOn(InputFlow)) { + Own(flow, InputRowArg); + DependsOn(flow, RowTime); + } + } + + NUdf::TUnboxedValue AddColumn(NUdf::TUnboxedValue&& row, bool outOfOrder, TComputationContext& ctx) const { + if (row.IsSpecial()) { + return row; + } + NUdf::TUnboxedValue* itemsPtr = nullptr; + auto result = Cache.NewArray(ctx, InputRowColumnCount + 1, itemsPtr); + ui32 inputColumnIndex = 0; + for (ui32 i = 0; i != InputRowColumnCount + 1; ++i) { + if (OutOfOrderColumnIndex == i) { + *itemsPtr++ = NUdf::TUnboxedValuePod{outOfOrder}; + } else { + *itemsPtr++ = std::move(row.GetElements()[inputColumnIndex++]); + } + } + return result; + } + + IComputationNode* const InputFlow; + IComputationExternalNode* const InputRowArg; + IComputationNode* const RowTime; + const ui32 InputRowColumnCount; + const ui32 OutOfOrderColumnIndex; + const IComputationNode* Delay; + const IComputationNode* Ahead; + const IComputationNode* RowLimit; + const TContainerCacheOnContext Cache; +}; + +} //namespace + +IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx, + TRuntimeNode inputFlow, + TRuntimeNode inputRowArg, + TRuntimeNode rowTime, + TRuntimeNode inputRowColumnCount, + TRuntimeNode outOfOrderColumnIndex, + TRuntimeNode delay, + TRuntimeNode ahead, + TRuntimeNode rowLimit) +{ + return new TTimeOrderRecover(ctx.Mutables + , GetValueRepresentation(inputFlow.GetStaticType()) + , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) + , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) + , LocateNode(ctx.NodeLocator, *rowTime.GetNode()) + , AS_VALUE(TDataLiteral, inputRowColumnCount)->AsValue().Get<ui32>() + , AS_VALUE(TDataLiteral, outOfOrderColumnIndex)->AsValue().Get<ui32>() + , LocateNode(ctx.NodeLocator, *delay.GetNode()) + , LocateNode(ctx.NodeLocator, *ahead.GetNode()) + , LocateNode(ctx.NodeLocator, *rowLimit.GetNode()) + ); +} + +}//namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h new file mode 100644 index 0000000000..84beea6c17 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.h @@ -0,0 +1,16 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr::NMiniKQL { + +IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx, + TRuntimeNode inputFlow, + TRuntimeNode inputRowArg, + TRuntimeNode rowTime, + TRuntimeNode inputRowColumnCount, + TRuntimeNode outOfOrderColumnIndex, + TRuntimeNode delay, + TRuntimeNode ahead, + TRuntimeNode rowLimit); + +} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt index e40064d4a6..b157708bfb 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt @@ -133,6 +133,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt index 01c73e57af..00214e6906 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt @@ -134,6 +134,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt index 01c73e57af..00214e6906 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt @@ -134,6 +134,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt index e40064d4a6..b157708bfb 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt @@ -133,6 +133,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_switch.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_take.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_timezone.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc index d4315842e7..fcfa9104cc 100644 --- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc +++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc @@ -104,6 +104,7 @@ SRCS( mkql_condense1.cpp mkql_switch.cpp mkql_take.cpp + mkql_time_order_recover.cpp mkql_timezone.cpp mkql_tobytes.cpp mkql_todict.cpp diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 0248d6f928..054edd62c4 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -8,6 +8,7 @@ #include "ydb/library/yql/minikql/mkql_utils.h" #include "ydb/library/yql/minikql/mkql_type_builder.h" #include "ydb/library/yql/core/sql_types/match_recognize.h" +#include "ydb/library/yql/core/sql_types/time_order_recover.h" #include <util/string/cast.h> #include <util/string/printf.h> @@ -6004,6 +6005,41 @@ TRuntimeNode TProgramBuilder::MatchRecognizeCore( return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::TimeOrderRecover( + TRuntimeNode inputStream, + const TUnaryLambda& getTimeExtractor, + TRuntimeNode delay, + TRuntimeNode ahead, + TRuntimeNode rowLimit + ) +{ + MKQL_ENSURE(RuntimeVersion >= 44, "TimeOrderRecover is not supported in runtime version " << RuntimeVersion); + + auto& inputRowType = *static_cast<TStructType*>(AS_TYPE(TStructType, AS_TYPE(TFlowType, inputStream.GetStaticType())->GetItemType())); + const auto inputRowArg = Arg(&inputRowType); + TStructTypeBuilder outputRowTypeBuilder(Env); + outputRowTypeBuilder.Reserve(inputRowType.GetMembersCount() + 1); + const ui32 inputRowColumnCount = inputRowType.GetMembersCount(); + for (ui32 i = 0; i != inputRowColumnCount; ++i) { + outputRowTypeBuilder.Add(inputRowType.GetMemberName(i), inputRowType.GetMemberType(i)); + } + using NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER; + outputRowTypeBuilder.Add(OUT_OF_ORDER_MARKER, TDataType::Create(NUdf::TDataType<bool>::Id, Env)); + const auto outputRowType = outputRowTypeBuilder.Build(); + const auto outOfOrderColumnIndex = outputRowType->GetMemberIndex(OUT_OF_ORDER_MARKER); + TCallableBuilder callableBuilder(GetTypeEnvironment(), "TimeOrderRecover", TFlowType::Create(outputRowType, Env)); + + callableBuilder.Add(inputStream); + callableBuilder.Add(inputRowArg); + callableBuilder.Add(getTimeExtractor(inputRowArg)); + callableBuilder.Add(NewDataLiteral(inputRowColumnCount)); + callableBuilder.Add(NewDataLiteral(outOfOrderColumnIndex)); + callableBuilder.Add(delay), + callableBuilder.Add(ahead), + callableBuilder.Add(rowLimit); + return TRuntimeNode(callableBuilder.Build(), false); +} + bool CanExportType(TType* type, const TTypeEnvironment& env) { if (type->GetKind() == TType::EKind::Type) { return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index a2b9ae49ae..4be297d5ef 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -707,6 +707,14 @@ public: bool streamingMode ); + TRuntimeNode TimeOrderRecover( + TRuntimeNode inputStream, + const TUnaryLambda& getTimeExtractor, + TRuntimeNode delay, + TRuntimeNode ahead, + TRuntimeNode rowLimit + ); + protected: TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args); TRuntimeNode IfPresent(TRuntimeNode optional, const TUnaryLambda& thenBranch, TRuntimeNode elseBranch); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 751e8695f5..29eb20a452 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 43U +#define MKQL_RUNTIME_VERSION 44U #endif // History: diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 11d5c6842c..85ea5fbd31 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -919,6 +919,23 @@ TMkqlCommonCallableCompiler::TShared::TShared() { ); }); + AddCallable("TimeOrderRecover", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto inputStream = node.Child(0); + const auto timeExtractor = node.Child(1); + const auto delay = node.Child(2); + const auto ahead = node.Child(3); + const auto rowLimit = node.Child(4); + return ctx.ProgramBuilder.TimeOrderRecover( + MkqlBuildExpr(*inputStream, ctx), + [timeExtractor, &ctx](TRuntimeNode row) { + return MkqlBuildLambda(*timeExtractor, ctx, {row}); + }, + MkqlBuildExpr(*delay, ctx), + MkqlBuildExpr(*ahead, ctx), + MkqlBuildExpr(*rowLimit, ctx) + ); + }); + AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto variantObj = MkqlBuildExpr(node.Head(), ctx); auto type = node.Head().GetTypeAnn(); |