summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvokayndzop <[email protected]>2025-09-03 18:01:12 +0300
committervokayndzop <[email protected]>2025-09-03 19:02:17 +0300
commit678d0bbd3a659cf7162968ac7c8630c7719b9f22 (patch)
tree330b866887dbb33c615350a13b33029bf1fc5de4
parentc770445655ac5964d0c90ece7bbc95ba5513f782 (diff)
Watermarks: fix early events
commit_hash:513bd8f2833db8715382fe6dec50c928b8471965
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp12
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp430
-rw-r--r--yql/essentials/minikql/mkql_watermark.h10
3 files changed, 271 insertions, 181 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
index 17699b406b2..4bb5ca33c32 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
@@ -170,7 +170,10 @@ public:
return false;
}
- TInstant GetWatermark() {
+ TMaybe<TInstant> GetWatermark() {
+ if (!WatermarkMode) {
+ return Nothing();
+ }
return Watermark.WatermarkIn;
}
@@ -223,7 +226,9 @@ public:
return status;
}
PendingYield = true;
- CloseOldBuckets(GetWatermark().MicroSeconds(), newHopsStat);
+ if (auto watermark = GetWatermark()) {
+ CloseOldBuckets(watermark->MicroSeconds(), newHopsStat);
+ }
if (!Ready.empty()) {
result = std::move(Ready.front());
Ready.pop_front();
@@ -246,7 +251,8 @@ public:
const auto ts = time.Get<ui64>();
const auto hopIndex = ts / HopTime;
- auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex);
+ const auto watermark = GetWatermark();
+ auto& keyState = GetOrCreateKeyState(key, watermark ? watermark->MicroSeconds() / HopTime : hopIndex);
if (hopIndex < keyState.HopIndex) {
++LateEventsThrown;
continue;
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
index 378a87fbc8e..1ce5a0cf074 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
@@ -12,8 +12,7 @@
#include <library/cpp/testing/unittest/registar.h>
-namespace NKikimr {
-namespace NMiniKQL {
+namespace NKikimr::NMiniKQL {
namespace {
struct TInputItem {
@@ -27,37 +26,44 @@ namespace {
ui32 Val = 0;
ui64 Time = 0;
- constexpr bool operator==(const TOutputItem& rhs) const
- {
- return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time;
- }
+ constexpr auto operator<=>(const TOutputItem&) const = default;
};
- struct TOutputGroup {
- TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {}
+ [[maybe_unused]] IOutputStream& operator<<(IOutputStream& output, const TOutputItem& item) {
+ return output << "TItem{Key = " << item.Key << ", Val = " << item.Val << ", Time = " << item.Time << "}";
+ }
- std::vector<TOutputItem> Items;
- };
+ using TOutputGroup = std::vector<TOutputItem>;
- std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) {
- std::sort(vec.begin(), vec.end(), [](auto l, auto r) {
- return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time);
- });
- return vec;
- }
+ using TCheckCallback = std::function<void()>;
- IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) {
- output << "[";
- for (ui32 i = 0; i < items.size(); ++i) {
- output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")";
- if (i != items.size() - 1)
- output << ",";
- }
- output << "]";
- return output;
- }
+ using TStatsMap = TMap<TString, i64>;
+ TStatsMap DefaultStatsMap = {
+ {"MultiHop_NewHopsCount", 0},
+ {"MultiHop_EarlyThrownEventsCount", 0},
+ {"MultiHop_LateThrownEventsCount", 0},
+ {"MultiHop_EmptyTimeCount", 0},
+ {"MultiHop_KeysCount", 1},
+ };
+ using TEncoder = std::function<NUdf::TUnboxedValue(const TInputItem&, const THolderFactory&)>;
+ using TDecoder = std::function<TOutputItem(const NUdf::TUnboxedValue&)>;
+ using TFetchCallback = std::function<NUdf::EFetchStatus(NUdf::TUnboxedValue&)>;
+ using TFetchFactory = std::function<TFetchCallback(TUnboxedValueVector&&)>;
+
+ TFetchFactory DefaultFetchFactory = [](TUnboxedValueVector&& input) -> TFetchCallback {
+ return [
+ input = std::move(input),
+ inputIndex = 0ull
+ ](NUdf::TUnboxedValue& result) mutable -> NUdf::EFetchStatus {
+ if (inputIndex >= input.size()) {
+ return NUdf::EFetchStatus::Finish;
+ }
+ result = input[inputIndex++];
+ return NUdf::EFetchStatus::Ok;
+ };
+ };
TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) {
return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
@@ -71,42 +77,31 @@ namespace {
};
}
+ using TSetupFactory = std::function<TSetup<false>()>;
+
+ TWatermark GlobalWatermark;
+ TSetupFactory DefaultSetupFactory = []() -> TSetup<false> {
+ return TSetup<false>(GetAuxCallableFactory(GlobalWatermark));
+ };
+
struct TStream : public NUdf::TBoxedValue {
- TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield)
- : Items(items)
- , FetchCallback(fetchCallback)
- , yield(yield) {}
+ TStream(TCheckCallback&& checkCallback, TFetchCallback&& fetchCallback)
+ : CheckCallback_(std::move(checkCallback))
+ , FetchCallback_(std::move(fetchCallback))
+ {}
private:
- TUnboxedValueVector Items;
- ui32 Index = 0;
- std::function<void()> FetchCallback;
- bool* yield;
+ TCheckCallback CheckCallback_;
+ TFetchCallback FetchCallback_;
+ private:
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
- FetchCallback();
- if (*yield) {
- return NUdf::EFetchStatus::Yield;
- }
- if (Index >= Items.size()) {
- return NUdf::EFetchStatus::Finish;
- }
- result = Items[Index++];
- return NUdf::EFetchStatus::Ok;
+ CheckCallback_();
+ return FetchCallback_(result);
}
};
- THolder<IComputationGraph> BuildGraph(
- TSetup<false>& setup,
- bool watermarkMode,
- const std::vector<TInputItem> items,
- std::function<void()> fetchCallback,
- bool dataWatermarks,
- bool* yield,
- ui64 hop = 10,
- ui64 interval = 30,
- ui64 delay = 20)
- {
+ std::tuple<TType*, TEncoder, TDecoder> BuildInputType(TSetup<false>& setup) {
TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
auto structType = pgmBuilder.NewEmptyStructType();
@@ -120,7 +115,38 @@ namespace {
auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time");
auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum");
- auto inStreamType = pgmBuilder.NewStreamType(structType);
+ auto encode = [keyIndex, timeIndex, sumIndex](const TInputItem& input, const THolderFactory& holderFactory) -> NUdf::TUnboxedValue {
+ NUdf::TUnboxedValue* itemsPtr;
+ auto structValues = holderFactory.CreateDirectArrayHolder(3, itemsPtr);
+ itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(input.Key);
+ itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(input.Time);
+ itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(input.Val);
+ return structValues;
+ };
+
+ auto decode = [keyIndex, timeIndex, sumIndex](const NUdf::TUnboxedValue& result) -> TOutputItem {
+ return {
+ result.GetElement(keyIndex).Get<ui32>(),
+ result.GetElement(sumIndex).Get<ui32>(),
+ result.GetElement(timeIndex).Get<ui64>(),
+ };
+ };
+
+ return {structType, encode, decode};
+ }
+
+ THolder<IComputationGraph> BuildGraph(
+ TSetup<false>& setup,
+ TType* itemType,
+ ui64 hop,
+ ui64 interval,
+ ui64 delay,
+ bool dataWatermarks,
+ bool watermarkMode
+ ) {
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+
+ auto inStreamType = pgmBuilder.NewStreamType(itemType);
TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType);
auto streamNode = inStream.Build();
@@ -176,21 +202,7 @@ namespace {
pgmBuilder.NewDataLiteral<bool>(watermarkMode)
);
- auto graph = setup.BuildGraph(pgmReturn, {streamNode});
-
- TUnboxedValueVector streamItems;
- for (size_t i = 0; i < items.size(); ++i) {
- NUdf::TUnboxedValue* itemsPtr;
- auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
- itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key);
- itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time);
- itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val);
- streamItems.push_back(std::move(structValues));
- }
-
- auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback, yield));
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
- return graph;
+ return setup.BuildGraph(pgmReturn, {streamNode});
}
}
@@ -198,85 +210,111 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
void TestImpl(
const std::vector<TInputItem>& input,
const std::vector<TOutputGroup>& expected,
- bool dataWatermarks,
+ const TStatsMap& expectedStatsMap,
ui64 hop = 10,
ui64 interval = 30,
ui64 delay = 20,
- std::function<void(ui32, TSetup<false>&)> customCheck = [](ui32, TSetup<false>&){},
- TWatermark* watermark = nullptr,
- bool* yield = nullptr,
- std::function<void()> fetch_callback= [](){},
- bool watermarkMode = false)
- {
- bool yield_clone = false;
- if (!yield) {
- yield = &yield_clone;
+ bool dataWatermarks = false,
+ bool watermarkMode = false,
+ TFetchFactory fetchFactory = DefaultFetchFactory,
+ TSetupFactory setupFactory = DefaultSetupFactory
+ ) {
+ auto setup = setupFactory();
+
+ auto [itemType, encode, decode] = BuildInputType(setup);
+
+ auto graph = BuildGraph(setup, itemType, hop, interval, delay, dataWatermarks, watermarkMode);
+
+ size_t index = 0;
+ std::vector<TOutputItem> actual;
+ auto checkCallback = [&expected, &index, &actual]() -> void {
+ UNIT_ASSERT_LT_C(index, expected.size(), index << " < " << expected.size());
+ auto expectedItems = expected[index];
+ std::ranges::sort(expectedItems);
+ std::ranges::sort(actual);
+ UNIT_ASSERT_VALUES_EQUAL_C(expectedItems, actual, index);
+ ++index;
+ actual.clear();
+ };
+
+ TUnboxedValueVector boxedInput;
+ for (size_t i = 0; i < input.size(); ++i) {
+ boxedInput.push_back(encode(input[i], graph->GetHolderFactory()));
}
- if (watermarkMode) {
- dataWatermarks = false;
- }
- TWatermark watermark_clone{TInstant::Zero()};
- if (watermark == nullptr) {
- watermark = &watermark_clone;
- }
- TSetup<false> setup1(GetAuxCallableFactory(*watermark));
-
- ui32 curGroupId = 0;
- std::vector<TOutputItem> curResult;
-
- auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1, &fetch_callback]() {
- fetch_callback();
- auto expectedItems = Ordered(expected.at(curGroupId).Items); // Add more empty lists at yield in expected
- curResult = Ordered(curResult);
- UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems);
- customCheck(curGroupId, setup1);
- curGroupId++;
- curResult.clear();
- };
+ auto fetchCallback = fetchFactory(std::move(boxedInput));
- auto graph1 = BuildGraph(setup1, watermarkMode, input, check, dataWatermarks, yield, hop, interval, delay);
+ auto streamValue = NUdf::TUnboxedValuePod(new TStream(checkCallback, std::move(fetchCallback)));
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
- auto root1 = graph1->GetValue();
+ auto root = graph->GetValue();
- NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
- while (status == NUdf::EFetchStatus::Ok || status == NUdf::EFetchStatus::Yield) {
- NUdf::TUnboxedValue val;
- status = root1.Fetch(val);
+ auto status = NUdf::EFetchStatus::Ok;
+ while (NUdf::EFetchStatus::Finish != status) {
+ NUdf::TUnboxedValue result;
+ status = root.Fetch(result);
if (status == NUdf::EFetchStatus::Ok) {
- curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()});
+ actual.push_back(decode(result));
}
}
- check();
- UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size());
+ checkCallback();
+ UNIT_ASSERT_VALUES_EQUAL(expected.size(), index);
+
+ TStatsMap actualStatsMap;
+ setup.StatsRegistry->ForEachStat([&expectedStatsMap, &actualStatsMap](const TStatKey& key, i64 value) {
+ if (auto iter = expectedStatsMap.find(key.GetName());
+ iter != expectedStatsMap.end()) {
+ actualStatsMap.emplace(key.GetName(), value);
+ }
+ });
+ UNIT_ASSERT_VALUES_EQUAL(expectedStatsMap, actualStatsMap);
}
void TestWatermarksImpl(
const std::vector<TInputItem>& input,
const std::vector<TOutputGroup>& expected,
const std::vector<std::pair<ui64, TInstant>>& watermarks,
+ const TStatsMap& expectedStatsMap,
ui64 hop = 10,
ui64 interval = 30,
- ui64 delay = 20)
- {
- bool yield = false;
+ ui64 delay = 20
+ ) {
TWatermark watermark;
- ui64 inp_index = 0;
- ui64 pattern_index = 0;
- auto avant_fetch = [&yield, &watermark, &watermarks, &inp_index, &pattern_index](){
- yield = false;
- if (pattern_index >= watermarks.size()) {
- return;
- }
- if (inp_index == watermarks[pattern_index].first) {
- yield = true;
- watermark.WatermarkIn = watermarks[pattern_index].second;
- ++pattern_index;
- } else {
- ++inp_index;
- }
- };
- TestImpl(input, expected, false, hop, interval, delay, [](ui32, TSetup<false>&){}, &watermark, &yield, avant_fetch, true);
+ auto fetchFactory = [watermarks = watermarks, &watermark](TUnboxedValueVector input) -> TFetchCallback {
+ return [
+ input = input,
+ inputIndex = 0ull,
+ watermarks = watermarks,
+ watermarkIndex = 0ull,
+ &watermark
+ ](NUdf::TUnboxedValue& result) mutable -> NUdf::EFetchStatus {
+ if (watermarkIndex < watermarks.size() && watermarks[watermarkIndex].first == inputIndex) {
+ watermark.WatermarkIn = watermarks[watermarkIndex].second;
+ ++watermarkIndex;
+ return NUdf::EFetchStatus::Yield;
+ }
+ if (inputIndex >= input.size()) {
+ return NUdf::EFetchStatus::Finish;
+ }
+ result = input[inputIndex++];
+ return NUdf::EFetchStatus::Ok;
+ };
+ };
+ auto setupFactory = [&watermark]() -> TSetup<false> {
+ return TSetup<false>(GetAuxCallableFactory(watermark));
+ };
+ TestImpl(
+ input,
+ expected,
+ expectedStatsMap,
+ hop,
+ interval,
+ delay,
+ false,
+ true,
+ fetchFactory,
+ setupFactory
+ );
}
Y_UNIT_TEST(TestThrowWatermarkFromPast) {
@@ -288,7 +326,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 300, 5},
{1, 400, 6}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -298,13 +335,21 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
- TOutputGroup({})
+ TOutputGroup({
+ {1, 2, 110},
+ {1, 2, 120},
+ {1, 2, 130},
+ })
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{2, TInstant::MicroSeconds(20)},
{3, TInstant::MicroSeconds(40)}
};
- TestWatermarksImpl(input, expected, yield_pattern);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 3;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap);
}
Y_UNIT_TEST(TestThrowWatermarkFromFuture) {
@@ -316,23 +361,31 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 300, 5},
{1, 400, 6}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
- TOutputGroup({}),
+ TOutputGroup({
+ {1, 2, 110},
+ {1, 2, 120},
+ {1, 2, 130},
+ }),
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({})
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{2, TInstant::MicroSeconds(1000)},
{3, TInstant::MicroSeconds(2000)}
};
- TestWatermarksImpl(input, expected, yield_pattern);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 3;
+ expectedStatsMap["MultiHop_LateThrownEventsCount"] = 3;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 1;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap);
}
Y_UNIT_TEST(TestWatermarkFlow1) {
@@ -344,7 +397,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 300, 5},
{1, 400, 6}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -356,11 +408,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({})
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{0, TInstant::MicroSeconds(100)},
{3, TInstant::MicroSeconds(200)}
};
- TestWatermarksImpl(input, expected, yield_pattern);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 3;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap);
}
Y_UNIT_TEST(TestWatermarkFlow2) {
@@ -372,7 +428,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 107, 5},
{1, 106, 6}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -381,12 +436,20 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
- TOutputGroup({{1, 4, 90}, {1, 4, 100}, {1, 4, 110}})
+ TOutputGroup({
+ {1, 4, 90},
+ {1, 4, 100},
+ {1, 4, 110},
+ })
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{0, TInstant::MicroSeconds(76)},
};
- TestWatermarksImpl(input, expected, yield_pattern);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 3;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap);
}
Y_UNIT_TEST(TestWatermarkFlow3) {
@@ -398,7 +461,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 107, 5},
{1, 106, 6}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -407,12 +469,21 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({}),
TOutputGroup({}),
- TOutputGroup({{1, 4, 90}, {1, 9, 100}, {1, 9, 110}, {1, 5, 120}})
+ TOutputGroup({
+ {1, 4, 90},
+ {1, 9, 100},
+ {1, 9, 110},
+ {1, 5, 120},
+ })
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{0, TInstant::MicroSeconds(76)},
};
- TestWatermarksImpl(input, expected, yield_pattern);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 4;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 2;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap);
}
Y_UNIT_TEST(TestWatermarkFlowOverflow) {
@@ -437,7 +508,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{1, 121, 16},
{1, 126, 17},
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -463,14 +533,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{1, 65, 120}}),
TOutputGroup({{1, 17, 220}, {1, 32, 210}, {1, 46, 130}, {1, 46, 140}, {1, 46, 150}, {1, 46, 160}, {1, 46, 170}, {1, 46, 180}, {1, 46, 190}, {1, 46, 200}}),
};
- std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ const std::vector<std::pair<ui64, TInstant>> watermarks = {
{9, TInstant::MicroSeconds(1)},
{12, TInstant::MicroSeconds(2)},
{14, TInstant::MicroSeconds(50)},
{15, TInstant::MicroSeconds(110)},
{16, TInstant::MicroSeconds(120)},
};
- TestWatermarksImpl(input, expected, yield_pattern, 10, 100, 20);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 13;
+ expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 1;
+
+ TestWatermarksImpl(input, expected, watermarks, expectedStatsMap, 10, 100, 20);
}
Y_UNIT_TEST(TestDataWatermarks) {
@@ -491,7 +565,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}),
TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}),
};
- TestImpl(input, expected, true);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 12;
+
+ TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true);
}
Y_UNIT_TEST(TestDataWatermarksNoGarbage) {
@@ -506,22 +583,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{1, 2, 110}, {1, 2, 120}, {1, 2, 130}}),
TOutputGroup({{2, 1, 160}, {2, 1, 170}, {2, 1, 180}}),
};
- TestImpl(input, expected, true, 10, 30, 20,
- [](ui32 curGroup, TSetup<false>& setup) {
- if (curGroup != 2) {
- return;
- }
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 6;
- setup.StatsRegistry->ForEachStat([](const TStatKey& key, i64 value) {
- if (key.GetName() == "MultiHop_KeysCount") {
- UNIT_ASSERT_EQUAL_C(value, 1, "actual: " << value << " expected: " << 1);
- }
- });
- });
+ TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true, false);
}
Y_UNIT_TEST(TestValidness1) {
- const std::vector<TInputItem> input1 = {
+ const std::vector<TInputItem> input = {
// Group; Time; Value
{1, 101, 2},
{2, 101, 2},
@@ -529,7 +598,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
{2, 140, 5},
{2, 160, 1}
};
-
const std::vector<TOutputGroup> expected = {
TOutputGroup({}),
TOutputGroup({}),
@@ -540,7 +608,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150},
{2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}),
};
- TestImpl(input1, expected, false);
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 12;
+ expectedStatsMap["MultiHop_KeysCount"] = 2;
+
+ TestImpl(input, expected, expectedStatsMap);
}
Y_UNIT_TEST(TestValidness2) {
@@ -567,8 +639,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170},
{2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}),
};
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 22;
+ expectedStatsMap["MultiHop_KeysCount"] = 2;
- TestImpl(input, expected, true);
+ TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true);
}
Y_UNIT_TEST(TestValidness3) {
@@ -588,8 +663,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}})
};
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 12;
+ expectedStatsMap["MultiHop_KeysCount"] = 2;
- TestImpl(input, expected, true, 5, 10, 10);
+ TestImpl(input, expected, expectedStatsMap, 5, 10, 10, true);
}
Y_UNIT_TEST(TestDelay) {
@@ -603,8 +681,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}), TOutputGroup({}),
TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}})
};
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 5;
+ expectedStatsMap["MultiHop_LateThrownEventsCount"] = 1;
- TestImpl(input, expected, false);
+ TestImpl(input, expected, expectedStatsMap);
}
Y_UNIT_TEST(TestWindowsBeforeFirstElement) {
@@ -618,8 +699,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}})
};
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 4;
- TestImpl(input, expected, false);
+ TestImpl(input, expected, expectedStatsMap);
}
Y_UNIT_TEST(TestSubzeroValues) {
@@ -632,10 +715,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({}),
TOutputGroup({{1, 2, 30}}),
};
+ auto expectedStatsMap = DefaultStatsMap;
+ expectedStatsMap["MultiHop_NewHopsCount"] = 1;
- TestImpl(input, expected, false);
+ TestImpl(input, expected, expectedStatsMap);
}
}
-} // namespace NMiniKQL
-} // namespace NKikimr
+} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/mkql_watermark.h b/yql/essentials/minikql/mkql_watermark.h
index 8a5805bd536..5370247d26e 100644
--- a/yql/essentials/minikql/mkql_watermark.h
+++ b/yql/essentials/minikql/mkql_watermark.h
@@ -1,12 +1,12 @@
#pragma once
-#include "util/datetime/base.h"
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
-namespace NKikimr {
-namespace NMiniKQL {
+namespace NKikimr::NMiniKQL {
struct TWatermark {
- TInstant WatermarkIn;
+ TMaybe<TInstant> WatermarkIn;
};
-}}
+} // namespace NKikimr::NMiniKQL