diff options
author | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-23 15:03:57 +0300 |
---|---|---|
committer | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-23 15:48:06 +0300 |
commit | 48560f4d9e8d6945601b2e6f4be16f55549e6a7d (patch) | |
tree | 3af9bf1841578f0c821dfeb9060c190422e1f8e4 | |
parent | d6b5ae15d89d017661fe99befbff4522d6f92717 (diff) | |
download | ydb-48560f4d9e8d6945601b2e6f4be16f55549e6a7d.tar.gz |
Implement check-points in TimeOrderRecover
Add saving login, fix test
Add test (not passed)
7 files changed, 275 insertions, 16 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp index 5a533367cd5..3c104f1c2a4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp @@ -1,29 +1,66 @@ #include "mkql_time_order_recover.h" +#include "mkql_saveload.h" + #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> #include <queue> namespace NKikimr::NMiniKQL { namespace { -class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> { - using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>; +constexpr ui32 StateVersion = 1; + +class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover, true> { + using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover, true>; public: class TState: public TComputationValue<TState> { public: using TTimestamp = i64; //use signed integers to simplify arithmetics using TTimeinterval = i64; + using TSelf = TTimeOrderRecover; - TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit) + TState( + TMemoryUsageInfo* memInfo, + const TSelf* self, + TTimeinterval delay, + TTimeinterval ahead, + ui32 rowLimit, + TComputationContext& ctx) : TComputationValue<TState>(memInfo) + , Self(self) , Heap(Greater) , Delay(delay) , Ahead(ahead) , RowLimit(rowLimit + 1) , Latest(0) , Terminating(false) + , Ctx(ctx) {} + + private: + using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; + static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { + return lhs.first > rhs.first; + }; + using TStdHeap = std::priority_queue< + TEntry, + std::vector<TEntry, TMKQLAllocator<TEntry>>, + decltype(Greater)>; + + + struct THeap: public TStdHeap { + template<typename...TArgs> + THeap(TArgs... args) : TStdHeap(args...) {} + + auto begin() const { return c.begin(); } + auto end() const { return c.end(); } + auto clear() { return c.clear(); } + }; + + public: + NUdf::TUnboxedValue GetOutputIfReady() { if (Terminating && Heap.empty()) { return NUdf::TUnboxedValue::MakeFinish(); @@ -57,21 +94,57 @@ public: void Finish() { Terminating = true; } + private: - using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; - static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { - return lhs.first > rhs.first; - }; - using THeap = std::priority_queue< - TEntry, - std::vector<TEntry, TMKQLAllocator<TEntry>>, - decltype(Greater)>; + void Load(const NUdf::TStringRef& state) override { + TStringBuf in(state.Data(), state.Size()); + + const auto stateVersion = ReadUi32(in); + if (stateVersion == 1) { + const auto heapSize = ReadUi32(in); + ClearState(); + for (auto i = 0U; i < heapSize; ++i) { + TTimestamp t = ReadUi64(in); + NUdf::TUnboxedValue row = ReadUnboxedValue(in, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx); + Heap.emplace(t, std::move(row)); + } + Latest = ReadUi64(in); + Terminating = ReadBool(in); + } else { + THROW yexception() << "Invalid state version " << stateVersion; + } + } + + NUdf::TUnboxedValue Save() const override { + TString out; + WriteUi32(out, StateVersion); + WriteUi32(out, Heap.size()); + + for (const TEntry& entry : Heap) { + WriteUi64(out, entry.first); + WriteUnboxedValue(out, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), entry.second); + } + WriteUi64(out, Latest); + WriteBool(out, Terminating); + auto strRef = NUdf::TStringRef(out.data(), out.size()); + return MakeString(strRef); + } + + void ClearState() { + Heap.clear(); + Latest = 0; + Terminating = false; + } + + private: + const TSelf *const Self; THeap Heap; const TTimeinterval Delay; const TTimeinterval Ahead; const ui32 RowLimit; TTimestamp Latest; bool Terminating; //not applicable for streams, but useful for debug and testing + TComputationContext& Ctx; }; TTimeOrderRecover( @@ -84,8 +157,8 @@ public: ui32 outOfOrderColumnIndex, IComputationNode* delay, IComputationNode* ahead, - IComputationNode* rowLimit - ) + IComputationNode* rowLimit, + TType* stateType) : TBaseComputation(mutables, inputFlow, kind) , InputFlow(inputFlow) , InputRowArg(inputRowArg) @@ -96,15 +169,28 @@ public: , Ahead(ahead) , RowLimit(rowLimit) , Cache(mutables) - {} + , StateType(stateType) + , Packer(mutables) + { } NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const { if (stateValue.IsInvalid()) { stateValue = ctx.HolderFactory.Create<TState>( + this, Delay->GetValue(ctx).Get<i64>(), Ahead->GetValue(ctx).Get<i64>(), - RowLimit->GetValue(ctx).Get<ui32>() - ); + RowLimit->GetValue(ctx).Get<ui32>(), + ctx); + } else if (stateValue.HasValue() && !stateValue.IsBoxed()) { + // Load from saved state. + NUdf::TUnboxedValue state = ctx.HolderFactory.Create<TState>( + this, + Delay->GetValue(ctx).Get<i64>(), + Ahead->GetValue(ctx).Get<i64>(), + RowLimit->GetValue(ctx).Get<ui32>(), + ctx); + state.Load(stateValue.AsStringRef()); + stateValue = state; } auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get()); while (true) { @@ -161,6 +247,8 @@ private: const IComputationNode* Ahead; const IComputationNode* RowLimit; const TContainerCacheOnContext Cache; + TType* const StateType; + TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer; }; } //namespace @@ -175,6 +263,8 @@ IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx, TRuntimeNode ahead, TRuntimeNode rowLimit) { + auto* rowType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputFlow.GetStaticType())->GetItemType()); + return new TTimeOrderRecover(ctx.Mutables , GetValueRepresentation(inputFlow.GetStaticType()) , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) @@ -185,6 +275,7 @@ IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx, , LocateNode(ctx.NodeLocator, *delay.GetNode()) , LocateNode(ctx.NodeLocator, *ahead.GetNode()) , LocateNode(ctx.NodeLocator, *rowLimit.GetNode()) + , rowType ); } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt index db9defebb18..b4ada0b0771 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt @@ -68,6 +68,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt index 857bcf4069e..1dd6565f154 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt @@ -71,6 +71,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt index e408761f3fa..848ec70bcd8 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt @@ -72,6 +72,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt index 502d7e90d46..4522578cbde 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt @@ -61,6 +61,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp new file mode 100644 index 00000000000..5d278e8c4eb --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp @@ -0,0 +1,163 @@ +#include "../mkql_time_order_recover.h" +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_graph_saveload.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr { + namespace NMiniKQL { + + namespace { + TIntrusivePtr<IRandomProvider> CreateRandomProvider() { + return CreateDeterministicRandomProvider(1); + } + + TIntrusivePtr<ITimeProvider> CreateTimeProvider() { + return CreateDeterministicTimeProvider(10000000); + } + + struct TSetup { + TSetup(TScopedAlloc& alloc) + : Alloc(alloc) + { + FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + RandomProvider = CreateRandomProvider(); + TimeProvider = CreateTimeProvider(); + + Env.Reset(new TTypeEnvironment(Alloc)); + PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); + } + + THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { + Explorer.Walk(pgm.GetNode(), *Env); + TComputationPatternOpts opts( + Alloc.Ref(), + *Env, GetBuiltinFactory(), + FunctionRegistry.Get(), + NUdf::EValidateMode::None, + NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); + Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); + TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); + return Pattern->Clone(compOpts); + } + + TIntrusivePtr<IFunctionRegistry> FunctionRegistry; + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; + + TScopedAlloc& Alloc; + THolder<TTypeEnvironment> Env; + THolder<TProgramBuilder> PgmBuilder; + + TExploringNodeVisitor Explorer; + IComputationPattern::TPtr Pattern; + }; + + using TTestData = std::vector<std::tuple<ui32, i64, ui32>>; + + THolder<IComputationGraph> BuildGraph(TSetup& setup, const TTestData& input) { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto structType = pgmBuilder.NewStructType({ + {"key", pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)}, + {"time", pgmBuilder.NewDataType(NUdf::TDataType<i64>::Id)}, + {"sum", pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)}}); + + TVector<TRuntimeNode> items; + for (size_t i = 0; i < input.size(); ++i) + { + auto key = pgmBuilder.NewDataLiteral<ui32>(std::get<0>(input[i])); + auto time = pgmBuilder.NewDataLiteral<i64>(std::get<1>(input[i])); + auto sum = pgmBuilder.NewDataLiteral<ui32>(std::get<2>(input[i])); + + auto item = pgmBuilder.NewStruct(structType, + {{"key", key}, {"time", time}, {"sum", sum}}); + items.push_back(std::move(item)); + } + + const auto list = pgmBuilder.NewList(structType, std::move(items)); + auto inputFlow = pgmBuilder.ToFlow(list); + + i64 delay = -10; + i64 ahead = 30; + ui32 rowLimit = 20; + + auto pgmReturn = pgmBuilder.TimeOrderRecover( + inputFlow, + [&](TRuntimeNode item) { + return pgmBuilder.Member(item, "time"); + }, + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&ahead, sizeof(ahead))), + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&rowLimit, sizeof(rowLimit)))); + + auto graph = setup.BuildGraph(pgmReturn); + return graph; + } + } + + Y_UNIT_TEST_SUITE(TMiniKQLTimeOrderRecoverSaveLoadTest) { + void TestWithSaveLoadImpl( + const TTestData& input, + const TTestData& expected) + { + TScopedAlloc alloc(__LOCATION__); + std::vector<std::tuple<ui32, i64, ui32>> result; + TSetup setup1(alloc); + auto graph1 = BuildGraph(setup1, input); + + auto value = graph1->GetValue(); + UNIT_ASSERT(!value.IsFinish() && value); + result.emplace_back( + value.GetElement(1).Get<ui32>(), + value.GetElement(3).Get<i64>(), + value.GetElement(2).Get<ui32>()); + + TString graphState = graph1->SaveGraphState(); + TSetup setup2(alloc); + + auto graph2 = BuildGraph(setup2, TTestData()); + graph2->LoadGraphState(graphState); + + while (true) + { + value = graph2->GetValue(); + if (value.IsFinish()) + break; + + result.emplace_back( + value.GetElement(1).Get<ui32>(), + value.GetElement(3).Get<i64>(), + value.GetElement(2).Get<ui32>()); + } + + UNIT_ASSERT_EQUAL(result, expected); + } + + const std::vector<std::tuple<ui32, i64, ui32>> input = { + // Group; Time; Value + {1000, 800, 100}, + {2000, 802, 300}, + {3000, 801, 200}}; + + const std::vector<std::tuple<ui32, i64, ui32>> expected = { + // Group; Time; Value + {1000, 800, 100}, + {3000, 801, 200}, + {2000, 802, 300}}; + + Y_UNIT_TEST(Test1) { + TestWithSaveLoadImpl(input, expected); + } + + } + + } // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make index 0c66c6c5415..5afd7c2bd71 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make +++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make @@ -50,6 +50,7 @@ SRCS( mkql_safe_circular_buffer_ut.cpp mkql_sort_ut.cpp mkql_switch_ut.cpp + mkql_time_order_recover_saveload_ut.cpp mkql_todict_ut.cpp mkql_variant_ut.cpp mkql_wide_chain_map_ut.cpp |