aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@yandex-team.ru>2022-02-10 11:41:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit8850c0828a3703fdb4dd75090c8edf8de14b5671 (patch)
treecdb759e5bf4a20ff39e503834978c7e06de17608
parentaae4145d949ef4dddd30387a038da20a57b84c20 (diff)
downloadydb-8850c0828a3703fdb4dd75090c8edf8de14b5671.tar.gz
YQ-675: fix hopping
ref:6a78c858cc2d2b70cc9e47ceee374941b1a44bee
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp100
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp300
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp364
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/ya.make1
-rw-r--r--ydb/library/yql/minikql/watermark_tracker.cpp2
5 files changed, 570 insertions, 197 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
index 4c992dc2cbb..cb2b13f86d0 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
@@ -70,7 +70,7 @@ public:
struct TKeyState {
std::vector<TBucket, TMKQLAllocator<TBucket>> Buckets; // circular buffer
- ui64 HopIndex;
+ ui64 HopIndex; // Start index of current window
TKeyState(ui64 bucketsCount, ui64 hopIndex)
: Buckets(bucketsCount)
@@ -163,19 +163,13 @@ public:
return NUdf::EFetchStatus::Finish;
}
- i64 thrownEvents = 0;
- i64 newHops = 0;
- i64 emptyTimeCt = 0;
+ i64 thrownEventsStat = 0;
+ i64 newHopsStat = 0;
+ i64 emptyTimeCtStat = 0;
Y_DEFER {
- if (thrownEvents) {
- MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEvents);
- }
- if (newHops) {
- MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHops);
- }
- if (emptyTimeCt) {
- MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCt);
- }
+ MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEventsStat);
+ MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat);
+ MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat);
};
for (NUdf::TUnboxedValue item;;) {
@@ -188,7 +182,7 @@ public:
const auto status = Stream.Fetch(item);
if (status != NUdf::EFetchStatus::Ok) {
if (status == NUdf::EFetchStatus::Finish) {
- CloseOldBuckets(Max<ui64>(), newHops);
+ CloseOldBuckets(Max<ui64>(), newHopsStat);
Finished = true;
if (!Ready.empty()) {
result = std::move(Ready.front());
@@ -203,34 +197,38 @@ public:
auto key = Self->KeyExtract->GetValue(Ctx);
const auto& time = Self->OutTime->GetValue(Ctx);
if (!time) {
- ++emptyTimeCt;
+ ++emptyTimeCtStat;
continue;
}
const auto ts = time.Get<ui64>();
const auto hopIndex = ts / HopTime;
- auto& keyState = GetOrCreateKeyState(key, hopIndex + 1);
-
- CloseOldBucketsForKey(key, keyState, hopIndex, newHops);
-
- if (hopIndex + DelayHopCount + 1 >= keyState.HopIndex) {
- auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
- if (!bucket.HasValue) {
- bucket.Value = Self->OutInit->GetValue(Ctx);
- bucket.HasValue = true;
- } else {
- Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
- Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
- bucket.Value = Self->OutUpdate->GetValue(Ctx);
- }
+ auto& keyState = GetOrCreateKeyState(key, hopIndex);
+
+ if (hopIndex < keyState.HopIndex) {
+ ++thrownEventsStat;
+ continue;
+ }
+
+ // Overflow is not possible, because of hopIndex is a product of a division
+ auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0);
+
+ CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat);
+
+ auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
+ if (!bucket.HasValue) {
+ bucket.Value = Self->OutInit->GetValue(Ctx);
+ bucket.HasValue = true;
} else {
- ++thrownEvents;
+ Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
+ Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
+ bucket.Value = Self->OutUpdate->GetValue(Ctx);
}
if (WatermarkTracker) {
const auto newWatermark = WatermarkTracker->HandleNextEventTime(ts);
if (newWatermark) {
- CloseOldBuckets(*newWatermark, newHops);
+ CloseOldBuckets(*newWatermark, newHopsStat);
}
}
MKQL_SET_STAT(Ctx.Stats, Hop_KeysCount, StatesMap.size());
@@ -241,7 +239,9 @@ public:
const auto iter = StatesMap.try_emplace(
key,
IntervalHopCount + DelayHopCount,
- hopIndex
+ Max<i64>(hopIndex + 1 - IntervalHopCount, 0)
+ // For first element we shouldn't forget windows in the past
+ // Overflow is not possible, because of hopIndex is a product of a division
);
if (iter.second) {
key.Ref();
@@ -253,20 +253,23 @@ public:
bool CloseOldBucketsForKey(
const NUdf::TUnboxedValue& key,
TKeyState& keyState,
- const ui64 hopIndex,
- i64& newHops)
+ const ui64 closeBeforeIndex, // Excluded bound
+ i64& newHopsStat)
{
auto& bucketsForKey = keyState.Buckets;
- const auto endIndex = Min(hopIndex, keyState.HopIndex + bucketsForKey.size()); // TODO: fix possible overflow
- for (auto& hopIndexForKey = keyState.HopIndex; hopIndexForKey <= endIndex; hopIndexForKey++) {
- auto firstBucketIndex = hopIndexForKey % bucketsForKey.size();
+ auto stateEmpty = true;
+ for (auto i = 0; i < bucketsForKey.size(); i++) {
+ const auto curHopIndex = keyState.HopIndex;
+ if (curHopIndex >= closeBeforeIndex) {
+ stateEmpty = false;
+ break;
+ }
- auto bucketIndex = firstBucketIndex;
TMaybe<NUdf::TUnboxedValue> aggregated;
-
- for (ui64 i = 0; i < IntervalHopCount; ++i) {
- const auto& bucket = bucketsForKey[bucketIndex];
+ for (ui64 j = 0; j < IntervalHopCount; j++) {
+ const auto curBucketIndex = (curHopIndex + j) % bucketsForKey.size();
+ const auto& bucket = bucketsForKey[curBucketIndex];
if (bucket.HasValue) {
if (!aggregated) { // todo: clone
Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
@@ -278,33 +281,32 @@ public:
aggregated = Self->OutMerge->GetValue(Ctx);
}
}
- if (++bucketIndex == bucketsForKey.size()) {
- bucketIndex = 0;
- }
}
- auto& clearBucket = bucketsForKey[firstBucketIndex];
+ auto& clearBucket = bucketsForKey[curHopIndex % bucketsForKey.size()];
clearBucket.Value = NUdf::TUnboxedValue();
clearBucket.HasValue = false;
if (aggregated) {
Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
Self->State->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
- Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((hopIndexForKey - DelayHopCount) * HopTime));
+ // Outer code requires window end time (not start as could be expected)
+ Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((curHopIndex + IntervalHopCount) * HopTime));
Ready.emplace_back(Self->OutFinish->GetValue(Ctx));
}
- ++newHops;
+ keyState.HopIndex++;
+ newHopsStat++;
}
- return endIndex < hopIndex;
+ return stateEmpty;
}
void CloseOldBuckets(ui64 watermarkTs, i64& newHops) {
const auto watermarkIndex = watermarkTs / HopTime;
EraseNodesIf(StatesMap, [&](auto& iter) {
auto& [key, val] = iter;
- const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex, newHops);
+ const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex + 1 - IntervalHopCount, newHops);
if (keyStateBecameEmpty) {
key.UnRef();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
new file mode 100644
index 00000000000..6819dc77187
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
@@ -0,0 +1,300 @@
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_graph_saveload.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+ TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
+ return CreateDeterministicRandomProvider(1);
+ }
+
+ TIntrusivePtr<ITimeProvider> CreateTimeProvider() {
+ return CreateDeterministicTimeProvider(10000000);
+ }
+
+ TComputationNodeFactory GetAuxCallableFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "OneYieldStream") {
+ return new TExternalComputationNode(ctx.Mutables);
+ }
+
+ return GetBuiltinFactory()(callable, ctx);
+ };
+ }
+
+ struct TSetup {
+ TSetup(TScopedAlloc& alloc)
+ : Alloc(alloc)
+ {
+ FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
+ RandomProvider = CreateRandomProvider();
+ TimeProvider = CreateTimeProvider();
+
+ Env.Reset(new TTypeEnvironment(Alloc));
+ PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
+ }
+
+ THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
+ Explorer.Walk(pgm.GetNode(), *Env);
+ TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(),
+ FunctionRegistry.Get(),
+ NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi);
+ Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
+ TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider);
+ return Pattern->Clone(compOpts);
+ }
+
+ TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TIntrusivePtr<ITimeProvider> TimeProvider;
+
+ TScopedAlloc& Alloc;
+ THolder<TTypeEnvironment> Env;
+ THolder<TProgramBuilder> PgmBuilder;
+
+ TExploringNodeVisitor Explorer;
+ IComputationPattern::TPtr Pattern;
+ };
+
+ struct TStreamWithYield : public NUdf::TBoxedValue {
+ TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index)
+ : Items(items)
+ , YieldPos(yieldPos)
+ , Index(index)
+ {}
+
+ private:
+ TUnboxedValueVector Items;
+ ui32 YieldPos;
+ ui32 Index;
+
+ ui32 GetTraverseCount() const override {
+ return 0;
+ }
+
+ NUdf::TUnboxedValue Save() const override {
+ return NUdf::TUnboxedValue::Zero();
+ }
+
+ void Load(const NUdf::TStringRef& state) override {
+ Y_UNUSED(state);
+ }
+
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
+ if (Index >= Items.size()) {
+ return NUdf::EFetchStatus::Finish;
+ }
+ if (Index == YieldPos) {
+ return NUdf::EFetchStatus::Yield;
+ }
+ result = Items[Index++];
+ return NUdf::EFetchStatus::Ok;
+ }
+ };
+
+ THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items,
+ ui32 yieldPos, ui32 startIndex, bool dataWatermarks) {
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+
+ auto structType = pgmBuilder.NewEmptyStructType();
+ structType = pgmBuilder.NewStructType(structType, "key",
+ pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
+ structType = pgmBuilder.NewStructType(structType, "time",
+ pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id));
+ structType = pgmBuilder.NewStructType(structType, "sum",
+ pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
+ auto keyIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("key");
+ auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time");
+ auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum");
+
+ auto inStreamType = pgmBuilder.NewStreamType(structType);
+
+ TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType);
+ auto streamNode = inStream.Build();
+
+ ui64 hop = 10, interval = 30, delay = 20;
+
+ auto pgmReturn = pgmBuilder.MultiHoppingCore(
+ TRuntimeNode(streamNode, false),
+ [&](TRuntimeNode item) { // keyExtractor
+ return pgmBuilder.Member(item, "key");
+ },
+ [&](TRuntimeNode item) { // timeExtractor
+ return pgmBuilder.Member(item, "time");
+ },
+ [&](TRuntimeNode item) { // init
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+ members.emplace_back("sum", pgmBuilder.Member(item, "sum"));
+ return pgmBuilder.NewStruct(members);
+ },
+ [&](TRuntimeNode item, TRuntimeNode state) { // update
+ auto add = pgmBuilder.AggrAdd(
+ pgmBuilder.Member(item, "sum"),
+ pgmBuilder.Member(state, "sum"));
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+ members.emplace_back("sum", add);
+ return pgmBuilder.NewStruct(members);
+ },
+ [&](TRuntimeNode state) { // save
+ return pgmBuilder.Member(state, "sum");
+ },
+ [&](TRuntimeNode savedState) { // load
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+ members.emplace_back("sum", savedState);
+ return pgmBuilder.NewStruct(members);
+ },
+ [&](TRuntimeNode state1, TRuntimeNode state2) { // merge
+ auto add = pgmBuilder.AggrAdd(
+ pgmBuilder.Member(state1, "sum"),
+ pgmBuilder.Member(state2, "sum"));
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+ members.emplace_back("sum", add);
+ return pgmBuilder.NewStruct(members);
+ },
+ [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish
+ Y_UNUSED(time);
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+ members.emplace_back("key", key);
+ members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
+ return pgmBuilder.NewStruct(members);
+ },
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
+ pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay
+ pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks
+ );
+
+ auto graph = setup.BuildGraph(pgmReturn, {streamNode});
+
+ TUnboxedValueVector streamItems;
+ for (size_t i = 0; i < items.size(); ++i) {
+ NUdf::TUnboxedValue* itemsPtr;
+ auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
+ itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i]));
+ itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i]));
+ itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i]));
+ streamItems.push_back(std::move(structValues));
+ }
+
+ auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
+ return graph;
+ }
+}
+
+Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingSaveLoadTest) {
+ void TestWithSaveLoadImpl(
+ const std::vector<std::tuple<ui32, i64, ui32>> input,
+ const std::vector<std::tuple<ui32, ui32>> expected,
+ bool withTraverse,
+ bool dataWatermarks)
+ {
+ TScopedAlloc alloc;
+
+ for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) {
+ std::vector<std::tuple<ui32, ui32>> result;
+
+ TSetup setup1(alloc);
+ auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks);
+ auto root1 = graph1->GetValue();
+
+ NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
+ while (status == NUdf::EFetchStatus::Ok) {
+ NUdf::TUnboxedValue val;
+ status = root1.Fetch(val);
+ if (status == NUdf::EFetchStatus::Ok) {
+ result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
+ }
+ }
+ UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
+
+ TString graphState;
+ if (withTraverse) {
+ SaveGraphState(&root1, 1, 0ULL, graphState);
+ } else {
+ graphState = graph1->SaveGraphState();
+ }
+
+ TSetup setup2(alloc);
+ auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks);
+ NUdf::TUnboxedValue root2;
+ if (withTraverse) {
+ root2 = graph2->GetValue();
+ LoadGraphState(&root2, 1, 0ULL, graphState);
+ } else {
+ graph2->LoadGraphState(graphState);
+ root2 = graph2->GetValue();
+ }
+
+ status = NUdf::EFetchStatus::Ok;
+ while (status == NUdf::EFetchStatus::Ok) {
+ NUdf::TUnboxedValue val;
+ status = root2.Fetch(val);
+ if (status == NUdf::EFetchStatus::Ok) {
+ result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
+ }
+ }
+ UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
+
+ auto sortedExpected = expected;
+ std::sort(result.begin(), result.end());
+ std::sort(sortedExpected.begin(), sortedExpected.end());
+ UNIT_ASSERT_EQUAL(result, sortedExpected);
+ }
+ }
+
+ const std::vector<std::tuple<ui32, i64, ui32>> input1 = {
+ // Group; Time; Value
+ {2, 1, 2},
+ {1, 1, 2},
+ {2, 2, 3},
+ {1, 2, 3},
+ {2, 15, 4},
+ {1, 15, 4},
+ {2, 23, 6},
+ {1, 23, 6},
+ {2, 24, 5},
+ {1, 24, 5},
+ {2, 25, 7},
+ {1, 25, 7},
+ {2, 40, 2},
+ {1, 40, 2},
+ {2, 47, 1},
+ {1, 47, 1},
+ {2, 51, 6},
+ {1, 51, 6},
+ {2, 59, 2},
+ {1, 59, 2},
+ {2, 85, 8},
+ {1, 85, 8}
+ };
+
+ const std::vector<std::tuple<ui32, ui32>> expected = {
+ {1, 8}, {1, 8}, {1, 8}, {1, 8},
+ {1, 11}, {1, 11}, {1, 21}, {1, 22},
+ {1, 27},
+ {2, 8}, {2, 8}, {2, 8}, {2, 8},
+ {2, 11}, {2, 11}, {2, 21},
+ {2, 22}, {2, 27}};
+
+ Y_UNIT_TEST(Test1) {
+ TestWithSaveLoadImpl(input1, expected, true, false);
+ }
+
+ Y_UNIT_TEST(Test2) {
+ TestWithSaveLoadImpl(input1, expected, false, false);
+ }
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
index b46ad9b3fca..719ba1f6ea1 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
@@ -14,6 +14,48 @@ namespace NKikimr {
namespace NMiniKQL {
namespace {
+ struct TInputItem {
+ ui32 Key = 0;
+ i64 Time = 0;
+ ui32 Val = 0;
+ };
+
+ struct TOutputItem {
+ ui32 Key = 0;
+ ui32 Val = 0;
+ ui64 Time = 0;
+
+ constexpr bool operator==(const TOutputItem& rhs) const
+ {
+ return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time;
+ }
+ };
+
+ struct TOutputGroup {
+ TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {}
+
+ std::vector<TOutputItem> Items;
+ };
+
+ std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) {
+ auto res = vec;
+ std::sort(res.begin(), res.end(), [](auto l, auto r) {
+ return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time);
+ });
+ return res;
+ }
+
+ IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) {
+ output << "[";
+ for (ui32 i = 0; i < items.size(); ++i) {
+ output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")";
+ if (i != items.size() - 1)
+ output << ",";
+ }
+ output << "]";
+ return output;
+ }
+
TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
return CreateDeterministicRandomProvider(1);
}
@@ -24,7 +66,7 @@ namespace {
TComputationNodeFactory GetAuxCallableFactory() {
return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- if (callable.GetType()->GetName() == "OneYieldStream") {
+ if (callable.GetType()->GetName() == "MyStream") {
return new TExternalComputationNode(ctx.Mutables);
}
@@ -66,44 +108,35 @@ namespace {
IComputationPattern::TPtr Pattern;
};
- struct TStreamWithYield : public NUdf::TBoxedValue {
- TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index)
+ struct TStream : public NUdf::TBoxedValue {
+ TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback)
: Items(items)
- , YieldPos(yieldPos)
- , Index(index)
- {}
+ , FetchCallback(fetchCallback) {}
private:
TUnboxedValueVector Items;
- ui32 YieldPos;
ui32 Index;
-
- ui32 GetTraverseCount() const override {
- return 0;
- }
-
- NUdf::TUnboxedValue Save() const override {
- return NUdf::TUnboxedValue::Zero();
- }
-
- void Load(const NUdf::TStringRef& state) override {
- Y_UNUSED(state);
- }
+ std::function<void()> FetchCallback;
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
+ FetchCallback();
if (Index >= Items.size()) {
return NUdf::EFetchStatus::Finish;
}
- if (Index == YieldPos) {
- return NUdf::EFetchStatus::Yield;
- }
result = Items[Index++];
return NUdf::EFetchStatus::Ok;
}
};
- THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items,
- ui32 yieldPos, ui32 startIndex, bool dataWatermarks) {
+ THolder<IComputationGraph> BuildGraph(
+ TSetup& setup,
+ const std::vector<TInputItem> items,
+ std::function<void()> fetchCallback,
+ bool dataWatermarks,
+ ui64 hop = 10,
+ ui64 interval = 30,
+ ui64 delay = 20)
+ {
TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
auto structType = pgmBuilder.NewEmptyStructType();
@@ -119,11 +152,9 @@ namespace {
auto inStreamType = pgmBuilder.NewStreamType(structType);
- TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType);
+ TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType);
auto streamNode = inStream.Build();
- ui64 hop = 10, interval = 30, delay = 20;
-
auto pgmReturn = pgmBuilder.MultiHoppingCore(
TRuntimeNode(streamNode, false),
[&](TRuntimeNode item) { // keyExtractor
@@ -162,10 +193,10 @@ namespace {
return pgmBuilder.NewStruct(members);
},
[&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish
- Y_UNUSED(time);
std::vector<std::pair<std::string_view, TRuntimeNode>> members;
members.emplace_back("key", key);
members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
+ members.emplace_back("time", time);
return pgmBuilder.NewStruct(members);
},
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
@@ -180,155 +211,194 @@ namespace {
for (size_t i = 0; i < items.size(); ++i) {
NUdf::TUnboxedValue* itemsPtr;
auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
- itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i]));
- itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i]));
- itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i]));
+ itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key);
+ itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time);
+ itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val);
streamItems.push_back(std::move(structValues));
}
- auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
+ auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback));
graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
return graph;
}
}
Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
- void TestWithSaveLoadImpl(
- const std::vector<std::tuple<ui32, i64, ui32>> input,
- const std::vector<std::tuple<ui32, ui32>> expected,
- std::vector<std::tuple<ui32, ui32>> expectedFinish,
- bool withTraverse,
- bool dataWatermarks)
+ void TestImpl(
+ const std::vector<TInputItem> input,
+ const std::vector<TOutputGroup> expected,
+ bool dataWatermarks,
+ ui64 hop = 10,
+ ui64 interval = 30,
+ ui64 delay = 20)
{
TScopedAlloc alloc;
+ TSetup setup1(alloc);
- for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) {
- std::vector<std::tuple<ui32, ui32>> result;
+ ui32 curGroupId = 0;
+ std::vector<TOutputItem> curResult;
- TSetup setup1(alloc);
- auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks);
- auto root1 = graph1->GetValue();
-
- NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
- while (status == NUdf::EFetchStatus::Ok) {
- NUdf::TUnboxedValue val;
- status = root1.Fetch(val);
- if (status == NUdf::EFetchStatus::Ok) {
- result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
- }
- }
- UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
+ auto check = [&curResult, &curGroupId, &expected]() {
+ auto expectedItems = Ordered(expected.at(curGroupId).Items);
+ curResult = Ordered(curResult);
+ UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems);
+ curGroupId++;
+ curResult.clear();
+ };
- TString graphState;
- if (withTraverse) {
- SaveGraphState(&root1, 1, 0ULL, graphState);
- } else {
- graphState = graph1->SaveGraphState();
- }
+ auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, hop, interval, delay);
- TSetup setup2(alloc);
- auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks);
- NUdf::TUnboxedValue root2;
- if (withTraverse) {
- root2 = graph2->GetValue();
- LoadGraphState(&root2, 1, 0ULL, graphState);
- } else {
- graph2->LoadGraphState(graphState);
- root2 = graph2->GetValue();
- }
+ auto root1 = graph1->GetValue();
- status = NUdf::EFetchStatus::Ok;
- while (status == NUdf::EFetchStatus::Ok) {
- NUdf::TUnboxedValue val;
- status = root2.Fetch(val);
- if (status == NUdf::EFetchStatus::Ok) {
- result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
- }
+ NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
+ while (status == NUdf::EFetchStatus::Ok) {
+ NUdf::TUnboxedValue val;
+ status = root1.Fetch(val);
+ if (status == NUdf::EFetchStatus::Ok) {
+ curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()});
}
- UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
-
- // After getting finish, current windows will be closed in random order.
- // So check last part of result as unordered.
- std::vector<std::tuple<ui32, ui32>> resultPart1 = {result.begin(), result.end() - expectedFinish.size()};
- std::vector<std::tuple<ui32, ui32>> resultPart2 = {result.begin() + expected.size(), result.end()};
- std::sort(resultPart2.begin(), resultPart2.end());
- std::sort(expectedFinish.begin(), expectedFinish.end());
- UNIT_ASSERT_EQUAL(resultPart1, expected);
- UNIT_ASSERT_EQUAL(resultPart2, expectedFinish);
}
+
+ check();
+ // TODO: some problem with parallel run
+ //UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size());
}
- const std::vector<std::tuple<ui32, i64, ui32>> input1 = {
- // Group; Time; Value
- {2, 1, 2},
- {1, 1, 2},
- {2, 2, 3},
- {1, 2, 3},
- {2, 15, 4},
- {1, 15, 4},
- {2, 23, 6},
- {1, 23, 6},
- {2, 24, 5},
- {1, 24, 5},
- {2, 25, 7},
- {1, 25, 7},
- {2, 40, 2},
- {1, 40, 2},
- {2, 47, 1},
- {1, 47, 1},
- {2, 51, 6},
- {1, 51, 6},
- {2, 59, 2},
- {1, 59, 2},
- {2, 85, 8},
- {1, 85, 8},
- {2, 55, 1000},
- {1, 55, 1000},
- {2, 200, 2},
- {1, 200, 3}
- };
+ Y_UNIT_TEST(TestDataWatermarks) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 2},
+ {2, 101, 2},
+ {1, 111, 3},
+ {2, 140, 5},
+ {2, 160, 1}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 2, 110}, {1, 5, 120}, {2, 2, 110}, {2, 2, 120}}),
+ TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}),
+ TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}),
+ };
+ TestImpl(input, expected, true);
+ }
- const std::vector<std::tuple<ui32, ui32>> expected1 = {
- {2, 5}, {2, 9}, {1, 5},
- {1, 9}, {2, 27}, {1, 27},
- {2, 22}, {2, 21}, {2, 11},
- {1, 22}, {1, 21}, {1, 11},
- {2, 11}, {2, 8}, {2, 8},
- {2, 8}, {2, 8}, {1, 11},
- {1, 8}, {1, 8}, {1, 8},
- {1, 8}};
-
- const std::vector<std::tuple<ui32, ui32>> expected1Finish = {{2, 2}, {1, 3}};
-
- const std::vector<std::tuple<ui32, i64, ui32>> input2 = {
- // Group; Time; Value
- {1, 1, 2},
- {2, 1, 2},
- {1, 11, 3},
- {2, 40, 5},
- {2, 60, 1}
- };
+ Y_UNIT_TEST(TestValidness1) {
+ const std::vector<TInputItem> input1 = {
+ // Group; Time; Value
+ {1, 101, 2},
+ {2, 101, 2},
+ {1, 111, 3},
+ {2, 140, 5},
+ {2, 160, 1}
+ };
- const std::vector<std::tuple<ui32, ui32>> expected2Finish = {{2, 5}, {2, 5}, {2, 6}, {2, 1}, {2, 1}, {1, 5}, {1, 3}};
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{2, 2, 110}, {2, 2, 120}}),
+ TOutputGroup({{2, 2, 130}}),
+ TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150},
+ {2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}),
+ };
+ TestImpl(input1, expected, false);
+ }
- const std::vector<std::tuple<ui32, ui32>> expected2 = {
- {2, 2}, {2, 2}, {2, 2},
- {1, 2}, {1, 5}};
+ Y_UNIT_TEST(TestValidness2) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {2, 101, 2}, {1, 101, 2}, {2, 102, 3}, {1, 102, 3}, {2, 115, 4},
+ {1, 115, 4}, {2, 123, 6}, {1, 123, 6}, {2, 124, 5}, {1, 124, 5},
+ {2, 125, 7}, {1, 125, 7}, {2, 140, 2}, {1, 140, 2}, {2, 147, 1},
+ {1, 147, 1}, {2, 151, 6}, {1, 151, 6}, {2, 159, 2}, {1, 159, 2},
+ {2, 185, 8}, {1, 185, 8}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({{1, 5, 110}, {1, 9, 120}, {2, 5, 110}, {2, 9, 120}}),
+ TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({{2, 27, 130}, {1, 27, 130}}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({{2, 22, 140}, {2, 21, 150}, {2, 11, 160}, {1, 22, 140}, {1, 21, 150}, {1, 11, 160}}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170},
+ {2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}),
+ };
- Y_UNIT_TEST(TestWithSaveLoad) {
- TestWithSaveLoadImpl(input1, expected1, expected1Finish, true, false);
+ TestImpl(input, expected, true);
}
- Y_UNIT_TEST(TestWithSaveLoad2) {
- TestWithSaveLoadImpl(input1, expected1, expected1Finish, false, false);
+ Y_UNIT_TEST(TestValidness3) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 105, 1}, {1, 107, 4}, {2, 106, 3}, {1, 111, 7}, {1, 117, 3},
+ {2, 110, 2}, {1, 108, 9}, {1, 121, 4}, {2, 107, 2}, {2, 141, 5},
+ {1, 141, 10}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({{1, 14, 110}, {2, 3, 110}}),
+ TOutputGroup({}),
+ TOutputGroup({{2, 7, 115}, {2, 2, 120}, {1, 21, 115}, {1, 10, 120}, {1, 7, 125}, {1, 4, 130}}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}})
+ };
+
+ TestImpl(input, expected, true, 5, 10, 10);
}
- Y_UNIT_TEST(TestWithSaveLoad3) {
- TestWithSaveLoadImpl(input2, expected2, expected2Finish, true, true);
+ Y_UNIT_TEST(TestDelay) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 3}, {1, 111, 5}, {1, 120, 7}, {1, 80, 9}, {1, 79, 11}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({}), TOutputGroup({}),
+ TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}})
+ };
+
+ TestImpl(input, expected, false);
}
- Y_UNIT_TEST(TestWithSaveLoad4) {
- TestWithSaveLoadImpl(input2, expected2, expected2Finish, false, true);
+ Y_UNIT_TEST(TestWindowsBeforeFirstElement) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 2}, {1, 111, 3}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}})
+ };
+
+ TestImpl(input, expected, false);
+ }
+
+ Y_UNIT_TEST(TestSubzeroValues) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 1, 2}
+ };
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 2, 30}}),
+ };
+
+ TestImpl(input, expected, false);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
index cd4e4b7cbb9..dffda1318f3 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make
+++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
@@ -27,6 +27,7 @@ SRCS(
mkql_chopper_ut.cpp
mkql_filters_ut.cpp
mkql_flatmap_ut.cpp
+ mkql_multihopping_saveload_ut.cpp
mkql_multihopping_ut.cpp
mkql_multimap_ut.cpp
mkql_fold_ut.cpp
diff --git a/ydb/library/yql/minikql/watermark_tracker.cpp b/ydb/library/yql/minikql/watermark_tracker.cpp
index fcfe07043c0..2e08bb08e76 100644
--- a/ydb/library/yql/minikql/watermark_tracker.cpp
+++ b/ydb/library/yql/minikql/watermark_tracker.cpp
@@ -36,4 +36,4 @@ std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
}
} // NMiniKQL
-} // NKikimr \ No newline at end of file
+} // NKikimr