aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkardymon-d <kardymon-d@yandex-team.com>2023-10-23 15:03:57 +0300
committerkardymon-d <kardymon-d@yandex-team.com>2023-10-23 15:48:06 +0300
commit48560f4d9e8d6945601b2e6f4be16f55549e6a7d (patch)
tree3af9bf1841578f0c821dfeb9060c190422e1f8e4
parentd6b5ae15d89d017661fe99befbff4522d6f92717 (diff)
downloadydb-48560f4d9e8d6945601b2e6f4be16f55549e6a7d.tar.gz
Implement check-points in TimeOrderRecover
Add saving login, fix test Add test (not passed)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp123
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp163
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/ya.make1
7 files changed, 275 insertions, 16 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
index 5a533367cd5..3c104f1c2a4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
@@ -1,29 +1,66 @@
#include "mkql_time_order_recover.h"
+#include "mkql_saveload.h"
+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <queue>
namespace NKikimr::NMiniKQL {
namespace {
-class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover> {
- using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover>;
+constexpr ui32 StateVersion = 1;
+
+class TTimeOrderRecover : public TStatefulFlowComputationNode<TTimeOrderRecover, true> {
+ using TBaseComputation = TStatefulFlowComputationNode<TTimeOrderRecover, true>;
public:
class TState: public TComputationValue<TState> {
public:
using TTimestamp = i64; //use signed integers to simplify arithmetics
using TTimeinterval = i64;
+ using TSelf = TTimeOrderRecover;
- TState(TMemoryUsageInfo* memInfo, TTimeinterval delay, TTimeinterval ahead, ui32 rowLimit)
+ TState(
+ TMemoryUsageInfo* memInfo,
+ const TSelf* self,
+ TTimeinterval delay,
+ TTimeinterval ahead,
+ ui32 rowLimit,
+ TComputationContext& ctx)
: TComputationValue<TState>(memInfo)
+ , Self(self)
, Heap(Greater)
, Delay(delay)
, Ahead(ahead)
, RowLimit(rowLimit + 1)
, Latest(0)
, Terminating(false)
+ , Ctx(ctx)
{}
+
+ private:
+ using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
+ static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
+ return lhs.first > rhs.first;
+ };
+ using TStdHeap = std::priority_queue<
+ TEntry,
+ std::vector<TEntry, TMKQLAllocator<TEntry>>,
+ decltype(Greater)>;
+
+
+ struct THeap: public TStdHeap {
+ template<typename...TArgs>
+ THeap(TArgs... args) : TStdHeap(args...) {}
+
+ auto begin() const { return c.begin(); }
+ auto end() const { return c.end(); }
+ auto clear() { return c.clear(); }
+ };
+
+ public:
+
NUdf::TUnboxedValue GetOutputIfReady() {
if (Terminating && Heap.empty()) {
return NUdf::TUnboxedValue::MakeFinish();
@@ -57,21 +94,57 @@ public:
void Finish() {
Terminating = true;
}
+
private:
- using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
- static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
- return lhs.first > rhs.first;
- };
- using THeap = std::priority_queue<
- TEntry,
- std::vector<TEntry, TMKQLAllocator<TEntry>>,
- decltype(Greater)>;
+ void Load(const NUdf::TStringRef& state) override {
+ TStringBuf in(state.Data(), state.Size());
+
+ const auto stateVersion = ReadUi32(in);
+ if (stateVersion == 1) {
+ const auto heapSize = ReadUi32(in);
+ ClearState();
+ for (auto i = 0U; i < heapSize; ++i) {
+ TTimestamp t = ReadUi64(in);
+ NUdf::TUnboxedValue row = ReadUnboxedValue(in, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx);
+ Heap.emplace(t, std::move(row));
+ }
+ Latest = ReadUi64(in);
+ Terminating = ReadBool(in);
+ } else {
+ THROW yexception() << "Invalid state version " << stateVersion;
+ }
+ }
+
+ NUdf::TUnboxedValue Save() const override {
+ TString out;
+ WriteUi32(out, StateVersion);
+ WriteUi32(out, Heap.size());
+
+ for (const TEntry& entry : Heap) {
+ WriteUi64(out, entry.first);
+ WriteUnboxedValue(out, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), entry.second);
+ }
+ WriteUi64(out, Latest);
+ WriteBool(out, Terminating);
+ auto strRef = NUdf::TStringRef(out.data(), out.size());
+ return MakeString(strRef);
+ }
+
+ void ClearState() {
+ Heap.clear();
+ Latest = 0;
+ Terminating = false;
+ }
+
+ private:
+ const TSelf *const Self;
THeap Heap;
const TTimeinterval Delay;
const TTimeinterval Ahead;
const ui32 RowLimit;
TTimestamp Latest;
bool Terminating; //not applicable for streams, but useful for debug and testing
+ TComputationContext& Ctx;
};
TTimeOrderRecover(
@@ -84,8 +157,8 @@ public:
ui32 outOfOrderColumnIndex,
IComputationNode* delay,
IComputationNode* ahead,
- IComputationNode* rowLimit
- )
+ IComputationNode* rowLimit,
+ TType* stateType)
: TBaseComputation(mutables, inputFlow, kind)
, InputFlow(inputFlow)
, InputRowArg(inputRowArg)
@@ -96,15 +169,28 @@ public:
, Ahead(ahead)
, RowLimit(rowLimit)
, Cache(mutables)
- {}
+ , StateType(stateType)
+ , Packer(mutables)
+ { }
NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const {
if (stateValue.IsInvalid()) {
stateValue = ctx.HolderFactory.Create<TState>(
+ this,
Delay->GetValue(ctx).Get<i64>(),
Ahead->GetValue(ctx).Get<i64>(),
- RowLimit->GetValue(ctx).Get<ui32>()
- );
+ RowLimit->GetValue(ctx).Get<ui32>(),
+ ctx);
+ } else if (stateValue.HasValue() && !stateValue.IsBoxed()) {
+ // Load from saved state.
+ NUdf::TUnboxedValue state = ctx.HolderFactory.Create<TState>(
+ this,
+ Delay->GetValue(ctx).Get<i64>(),
+ Ahead->GetValue(ctx).Get<i64>(),
+ RowLimit->GetValue(ctx).Get<ui32>(),
+ ctx);
+ state.Load(stateValue.AsStringRef());
+ stateValue = state;
}
auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get());
while (true) {
@@ -161,6 +247,8 @@ private:
const IComputationNode* Ahead;
const IComputationNode* RowLimit;
const TContainerCacheOnContext Cache;
+ TType* const StateType;
+ TMutableObjectOverBoxedValue<TValuePackerBoxed> Packer;
};
} //namespace
@@ -175,6 +263,8 @@ IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx,
TRuntimeNode ahead,
TRuntimeNode rowLimit)
{
+ auto* rowType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputFlow.GetStaticType())->GetItemType());
+
return new TTimeOrderRecover(ctx.Mutables
, GetValueRepresentation(inputFlow.GetStaticType())
, LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
@@ -185,6 +275,7 @@ IComputationNode* TimeOrderRecover(const TComputationNodeFactoryContext& ctx,
, LocateNode(ctx.NodeLocator, *delay.GetNode())
, LocateNode(ctx.NodeLocator, *ahead.GetNode())
, LocateNode(ctx.NodeLocator, *rowLimit.GetNode())
+ , rowType
);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt
index db9defebb18..b4ada0b0771 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt
@@ -68,6 +68,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt
index 857bcf4069e..1dd6565f154 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt
@@ -71,6 +71,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt
index e408761f3fa..848ec70bcd8 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt
@@ -72,6 +72,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt
index 502d7e90d46..4522578cbde 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt
@@ -61,6 +61,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_todict_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_variant_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_chain_map_ut.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
new file mode 100644
index 00000000000..5d278e8c4eb
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
@@ -0,0 +1,163 @@
+#include "../mkql_time_order_recover.h"
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_graph_saveload.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+ namespace NMiniKQL {
+
+ namespace {
+ TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
+ return CreateDeterministicRandomProvider(1);
+ }
+
+ TIntrusivePtr<ITimeProvider> CreateTimeProvider() {
+ return CreateDeterministicTimeProvider(10000000);
+ }
+
+ struct TSetup {
+ TSetup(TScopedAlloc& alloc)
+ : Alloc(alloc)
+ {
+ FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
+ RandomProvider = CreateRandomProvider();
+ TimeProvider = CreateTimeProvider();
+
+ Env.Reset(new TTypeEnvironment(Alloc));
+ PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
+ }
+
+ THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
+ Explorer.Walk(pgm.GetNode(), *Env);
+ TComputationPatternOpts opts(
+ Alloc.Ref(),
+ *Env, GetBuiltinFactory(),
+ FunctionRegistry.Get(),
+ NUdf::EValidateMode::None,
+ NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi);
+ Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
+ TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider);
+ return Pattern->Clone(compOpts);
+ }
+
+ TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TIntrusivePtr<ITimeProvider> TimeProvider;
+
+ TScopedAlloc& Alloc;
+ THolder<TTypeEnvironment> Env;
+ THolder<TProgramBuilder> PgmBuilder;
+
+ TExploringNodeVisitor Explorer;
+ IComputationPattern::TPtr Pattern;
+ };
+
+ using TTestData = std::vector<std::tuple<ui32, i64, ui32>>;
+
+ THolder<IComputationGraph> BuildGraph(TSetup& setup, const TTestData& input) {
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+
+ auto structType = pgmBuilder.NewStructType({
+ {"key", pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)},
+ {"time", pgmBuilder.NewDataType(NUdf::TDataType<i64>::Id)},
+ {"sum", pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)}});
+
+ TVector<TRuntimeNode> items;
+ for (size_t i = 0; i < input.size(); ++i)
+ {
+ auto key = pgmBuilder.NewDataLiteral<ui32>(std::get<0>(input[i]));
+ auto time = pgmBuilder.NewDataLiteral<i64>(std::get<1>(input[i]));
+ auto sum = pgmBuilder.NewDataLiteral<ui32>(std::get<2>(input[i]));
+
+ auto item = pgmBuilder.NewStruct(structType,
+ {{"key", key}, {"time", time}, {"sum", sum}});
+ items.push_back(std::move(item));
+ }
+
+ const auto list = pgmBuilder.NewList(structType, std::move(items));
+ auto inputFlow = pgmBuilder.ToFlow(list);
+
+ i64 delay = -10;
+ i64 ahead = 30;
+ ui32 rowLimit = 20;
+
+ auto pgmReturn = pgmBuilder.TimeOrderRecover(
+ inputFlow,
+ [&](TRuntimeNode item) {
+ return pgmBuilder.Member(item, "time");
+ },
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))),
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&ahead, sizeof(ahead))),
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&rowLimit, sizeof(rowLimit))));
+
+ auto graph = setup.BuildGraph(pgmReturn);
+ return graph;
+ }
+ }
+
+ Y_UNIT_TEST_SUITE(TMiniKQLTimeOrderRecoverSaveLoadTest) {
+ void TestWithSaveLoadImpl(
+ const TTestData& input,
+ const TTestData& expected)
+ {
+ TScopedAlloc alloc(__LOCATION__);
+ std::vector<std::tuple<ui32, i64, ui32>> result;
+ TSetup setup1(alloc);
+ auto graph1 = BuildGraph(setup1, input);
+
+ auto value = graph1->GetValue();
+ UNIT_ASSERT(!value.IsFinish() && value);
+ result.emplace_back(
+ value.GetElement(1).Get<ui32>(),
+ value.GetElement(3).Get<i64>(),
+ value.GetElement(2).Get<ui32>());
+
+ TString graphState = graph1->SaveGraphState();
+ TSetup setup2(alloc);
+
+ auto graph2 = BuildGraph(setup2, TTestData());
+ graph2->LoadGraphState(graphState);
+
+ while (true)
+ {
+ value = graph2->GetValue();
+ if (value.IsFinish())
+ break;
+
+ result.emplace_back(
+ value.GetElement(1).Get<ui32>(),
+ value.GetElement(3).Get<i64>(),
+ value.GetElement(2).Get<ui32>());
+ }
+
+ UNIT_ASSERT_EQUAL(result, expected);
+ }
+
+ const std::vector<std::tuple<ui32, i64, ui32>> input = {
+ // Group; Time; Value
+ {1000, 800, 100},
+ {2000, 802, 300},
+ {3000, 801, 200}};
+
+ const std::vector<std::tuple<ui32, i64, ui32>> expected = {
+ // Group; Time; Value
+ {1000, 800, 100},
+ {3000, 801, 200},
+ {2000, 802, 300}};
+
+ Y_UNIT_TEST(Test1) {
+ TestWithSaveLoadImpl(input, expected);
+ }
+
+ }
+
+ } // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
index 0c66c6c5415..5afd7c2bd71 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make
+++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
@@ -50,6 +50,7 @@ SRCS(
mkql_safe_circular_buffer_ut.cpp
mkql_sort_ut.cpp
mkql_switch_ut.cpp
+ mkql_time_order_recover_saveload_ut.cpp
mkql_todict_ut.cpp
mkql_variant_ut.cpp
mkql_wide_chain_map_ut.cpp