diff options
author | vokayndzop <[email protected]> | 2025-09-03 18:01:12 +0300 |
---|---|---|
committer | vokayndzop <[email protected]> | 2025-09-03 19:02:17 +0300 |
commit | 678d0bbd3a659cf7162968ac7c8630c7719b9f22 (patch) | |
tree | 330b866887dbb33c615350a13b33029bf1fc5de4 | |
parent | c770445655ac5964d0c90ece7bbc95ba5513f782 (diff) |
Watermarks: fix early events
commit_hash:513bd8f2833db8715382fe6dec50c928b8471965
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp | 12 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp | 430 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_watermark.h | 10 |
3 files changed, 271 insertions, 181 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp index 17699b406b2..4bb5ca33c32 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp @@ -170,7 +170,10 @@ public: return false; } - TInstant GetWatermark() { + TMaybe<TInstant> GetWatermark() { + if (!WatermarkMode) { + return Nothing(); + } return Watermark.WatermarkIn; } @@ -223,7 +226,9 @@ public: return status; } PendingYield = true; - CloseOldBuckets(GetWatermark().MicroSeconds(), newHopsStat); + if (auto watermark = GetWatermark()) { + CloseOldBuckets(watermark->MicroSeconds(), newHopsStat); + } if (!Ready.empty()) { result = std::move(Ready.front()); Ready.pop_front(); @@ -246,7 +251,8 @@ public: const auto ts = time.Get<ui64>(); const auto hopIndex = ts / HopTime; - auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex); + const auto watermark = GetWatermark(); + auto& keyState = GetOrCreateKeyState(key, watermark ? watermark->MicroSeconds() / HopTime : hopIndex); if (hopIndex < keyState.HopIndex) { ++LateEventsThrown; continue; diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp index 378a87fbc8e..1ce5a0cf074 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp @@ -12,8 +12,7 @@ #include <library/cpp/testing/unittest/registar.h> -namespace NKikimr { -namespace NMiniKQL { +namespace NKikimr::NMiniKQL { namespace { struct TInputItem { @@ -27,37 +26,44 @@ namespace { ui32 Val = 0; ui64 Time = 0; - constexpr bool operator==(const TOutputItem& rhs) const - { - return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time; - } + constexpr auto operator<=>(const TOutputItem&) const = default; }; - struct TOutputGroup { - TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {} + [[maybe_unused]] IOutputStream& operator<<(IOutputStream& output, const TOutputItem& item) { + return output << "TItem{Key = " << item.Key << ", Val = " << item.Val << ", Time = " << item.Time << "}"; + } - std::vector<TOutputItem> Items; - }; + using TOutputGroup = std::vector<TOutputItem>; - std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) { - std::sort(vec.begin(), vec.end(), [](auto l, auto r) { - return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time); - }); - return vec; - } + using TCheckCallback = std::function<void()>; - IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) { - output << "["; - for (ui32 i = 0; i < items.size(); ++i) { - output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")"; - if (i != items.size() - 1) - output << ","; - } - output << "]"; - return output; - } + using TStatsMap = TMap<TString, i64>; + TStatsMap DefaultStatsMap = { + {"MultiHop_NewHopsCount", 0}, + {"MultiHop_EarlyThrownEventsCount", 0}, + {"MultiHop_LateThrownEventsCount", 0}, + {"MultiHop_EmptyTimeCount", 0}, + {"MultiHop_KeysCount", 1}, + }; + using TEncoder = std::function<NUdf::TUnboxedValue(const TInputItem&, const THolderFactory&)>; + using TDecoder = std::function<TOutputItem(const NUdf::TUnboxedValue&)>; + using TFetchCallback = std::function<NUdf::EFetchStatus(NUdf::TUnboxedValue&)>; + using TFetchFactory = std::function<TFetchCallback(TUnboxedValueVector&&)>; + + TFetchFactory DefaultFetchFactory = [](TUnboxedValueVector&& input) -> TFetchCallback { + return [ + input = std::move(input), + inputIndex = 0ull + ](NUdf::TUnboxedValue& result) mutable -> NUdf::EFetchStatus { + if (inputIndex >= input.size()) { + return NUdf::EFetchStatus::Finish; + } + result = input[inputIndex++]; + return NUdf::EFetchStatus::Ok; + }; + }; TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) { return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { @@ -71,42 +77,31 @@ namespace { }; } + using TSetupFactory = std::function<TSetup<false>()>; + + TWatermark GlobalWatermark; + TSetupFactory DefaultSetupFactory = []() -> TSetup<false> { + return TSetup<false>(GetAuxCallableFactory(GlobalWatermark)); + }; + struct TStream : public NUdf::TBoxedValue { - TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield) - : Items(items) - , FetchCallback(fetchCallback) - , yield(yield) {} + TStream(TCheckCallback&& checkCallback, TFetchCallback&& fetchCallback) + : CheckCallback_(std::move(checkCallback)) + , FetchCallback_(std::move(fetchCallback)) + {} private: - TUnboxedValueVector Items; - ui32 Index = 0; - std::function<void()> FetchCallback; - bool* yield; + TCheckCallback CheckCallback_; + TFetchCallback FetchCallback_; + private: NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { - FetchCallback(); - if (*yield) { - return NUdf::EFetchStatus::Yield; - } - if (Index >= Items.size()) { - return NUdf::EFetchStatus::Finish; - } - result = Items[Index++]; - return NUdf::EFetchStatus::Ok; + CheckCallback_(); + return FetchCallback_(result); } }; - THolder<IComputationGraph> BuildGraph( - TSetup<false>& setup, - bool watermarkMode, - const std::vector<TInputItem> items, - std::function<void()> fetchCallback, - bool dataWatermarks, - bool* yield, - ui64 hop = 10, - ui64 interval = 30, - ui64 delay = 20) - { + std::tuple<TType*, TEncoder, TDecoder> BuildInputType(TSetup<false>& setup) { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; auto structType = pgmBuilder.NewEmptyStructType(); @@ -120,7 +115,38 @@ namespace { auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time"); auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum"); - auto inStreamType = pgmBuilder.NewStreamType(structType); + auto encode = [keyIndex, timeIndex, sumIndex](const TInputItem& input, const THolderFactory& holderFactory) -> NUdf::TUnboxedValue { + NUdf::TUnboxedValue* itemsPtr; + auto structValues = holderFactory.CreateDirectArrayHolder(3, itemsPtr); + itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(input.Key); + itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(input.Time); + itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(input.Val); + return structValues; + }; + + auto decode = [keyIndex, timeIndex, sumIndex](const NUdf::TUnboxedValue& result) -> TOutputItem { + return { + result.GetElement(keyIndex).Get<ui32>(), + result.GetElement(sumIndex).Get<ui32>(), + result.GetElement(timeIndex).Get<ui64>(), + }; + }; + + return {structType, encode, decode}; + } + + THolder<IComputationGraph> BuildGraph( + TSetup<false>& setup, + TType* itemType, + ui64 hop, + ui64 interval, + ui64 delay, + bool dataWatermarks, + bool watermarkMode + ) { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto inStreamType = pgmBuilder.NewStreamType(itemType); TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType); auto streamNode = inStream.Build(); @@ -176,21 +202,7 @@ namespace { pgmBuilder.NewDataLiteral<bool>(watermarkMode) ); - auto graph = setup.BuildGraph(pgmReturn, {streamNode}); - - TUnboxedValueVector streamItems; - for (size_t i = 0; i < items.size(); ++i) { - NUdf::TUnboxedValue* itemsPtr; - auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr); - itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key); - itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time); - itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val); - streamItems.push_back(std::move(structValues)); - } - - auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback, yield)); - graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); - return graph; + return setup.BuildGraph(pgmReturn, {streamNode}); } } @@ -198,85 +210,111 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { void TestImpl( const std::vector<TInputItem>& input, const std::vector<TOutputGroup>& expected, - bool dataWatermarks, + const TStatsMap& expectedStatsMap, ui64 hop = 10, ui64 interval = 30, ui64 delay = 20, - std::function<void(ui32, TSetup<false>&)> customCheck = [](ui32, TSetup<false>&){}, - TWatermark* watermark = nullptr, - bool* yield = nullptr, - std::function<void()> fetch_callback= [](){}, - bool watermarkMode = false) - { - bool yield_clone = false; - if (!yield) { - yield = &yield_clone; + bool dataWatermarks = false, + bool watermarkMode = false, + TFetchFactory fetchFactory = DefaultFetchFactory, + TSetupFactory setupFactory = DefaultSetupFactory + ) { + auto setup = setupFactory(); + + auto [itemType, encode, decode] = BuildInputType(setup); + + auto graph = BuildGraph(setup, itemType, hop, interval, delay, dataWatermarks, watermarkMode); + + size_t index = 0; + std::vector<TOutputItem> actual; + auto checkCallback = [&expected, &index, &actual]() -> void { + UNIT_ASSERT_LT_C(index, expected.size(), index << " < " << expected.size()); + auto expectedItems = expected[index]; + std::ranges::sort(expectedItems); + std::ranges::sort(actual); + UNIT_ASSERT_VALUES_EQUAL_C(expectedItems, actual, index); + ++index; + actual.clear(); + }; + + TUnboxedValueVector boxedInput; + for (size_t i = 0; i < input.size(); ++i) { + boxedInput.push_back(encode(input[i], graph->GetHolderFactory())); } - if (watermarkMode) { - dataWatermarks = false; - } - TWatermark watermark_clone{TInstant::Zero()}; - if (watermark == nullptr) { - watermark = &watermark_clone; - } - TSetup<false> setup1(GetAuxCallableFactory(*watermark)); - - ui32 curGroupId = 0; - std::vector<TOutputItem> curResult; - - auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1, &fetch_callback]() { - fetch_callback(); - auto expectedItems = Ordered(expected.at(curGroupId).Items); // Add more empty lists at yield in expected - curResult = Ordered(curResult); - UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems); - customCheck(curGroupId, setup1); - curGroupId++; - curResult.clear(); - }; + auto fetchCallback = fetchFactory(std::move(boxedInput)); - auto graph1 = BuildGraph(setup1, watermarkMode, input, check, dataWatermarks, yield, hop, interval, delay); + auto streamValue = NUdf::TUnboxedValuePod(new TStream(checkCallback, std::move(fetchCallback))); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); - auto root1 = graph1->GetValue(); + auto root = graph->GetValue(); - NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok || status == NUdf::EFetchStatus::Yield) { - NUdf::TUnboxedValue val; - status = root1.Fetch(val); + auto status = NUdf::EFetchStatus::Ok; + while (NUdf::EFetchStatus::Finish != status) { + NUdf::TUnboxedValue result; + status = root.Fetch(result); if (status == NUdf::EFetchStatus::Ok) { - curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()}); + actual.push_back(decode(result)); } } - check(); - UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size()); + checkCallback(); + UNIT_ASSERT_VALUES_EQUAL(expected.size(), index); + + TStatsMap actualStatsMap; + setup.StatsRegistry->ForEachStat([&expectedStatsMap, &actualStatsMap](const TStatKey& key, i64 value) { + if (auto iter = expectedStatsMap.find(key.GetName()); + iter != expectedStatsMap.end()) { + actualStatsMap.emplace(key.GetName(), value); + } + }); + UNIT_ASSERT_VALUES_EQUAL(expectedStatsMap, actualStatsMap); } void TestWatermarksImpl( const std::vector<TInputItem>& input, const std::vector<TOutputGroup>& expected, const std::vector<std::pair<ui64, TInstant>>& watermarks, + const TStatsMap& expectedStatsMap, ui64 hop = 10, ui64 interval = 30, - ui64 delay = 20) - { - bool yield = false; + ui64 delay = 20 + ) { TWatermark watermark; - ui64 inp_index = 0; - ui64 pattern_index = 0; - auto avant_fetch = [&yield, &watermark, &watermarks, &inp_index, &pattern_index](){ - yield = false; - if (pattern_index >= watermarks.size()) { - return; - } - if (inp_index == watermarks[pattern_index].first) { - yield = true; - watermark.WatermarkIn = watermarks[pattern_index].second; - ++pattern_index; - } else { - ++inp_index; - } - }; - TestImpl(input, expected, false, hop, interval, delay, [](ui32, TSetup<false>&){}, &watermark, &yield, avant_fetch, true); + auto fetchFactory = [watermarks = watermarks, &watermark](TUnboxedValueVector input) -> TFetchCallback { + return [ + input = input, + inputIndex = 0ull, + watermarks = watermarks, + watermarkIndex = 0ull, + &watermark + ](NUdf::TUnboxedValue& result) mutable -> NUdf::EFetchStatus { + if (watermarkIndex < watermarks.size() && watermarks[watermarkIndex].first == inputIndex) { + watermark.WatermarkIn = watermarks[watermarkIndex].second; + ++watermarkIndex; + return NUdf::EFetchStatus::Yield; + } + if (inputIndex >= input.size()) { + return NUdf::EFetchStatus::Finish; + } + result = input[inputIndex++]; + return NUdf::EFetchStatus::Ok; + }; + }; + auto setupFactory = [&watermark]() -> TSetup<false> { + return TSetup<false>(GetAuxCallableFactory(watermark)); + }; + TestImpl( + input, + expected, + expectedStatsMap, + hop, + interval, + delay, + false, + true, + fetchFactory, + setupFactory + ); } Y_UNIT_TEST(TestThrowWatermarkFromPast) { @@ -288,7 +326,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 300, 5}, {1, 400, 6} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -298,13 +335,21 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), - TOutputGroup({}) + TOutputGroup({ + {1, 2, 110}, + {1, 2, 120}, + {1, 2, 130}, + }) }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {2, TInstant::MicroSeconds(20)}, {3, TInstant::MicroSeconds(40)} }; - TestWatermarksImpl(input, expected, yield_pattern); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 3; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap); } Y_UNIT_TEST(TestThrowWatermarkFromFuture) { @@ -316,23 +361,31 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 300, 5}, {1, 400, 6} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), - TOutputGroup({}), + TOutputGroup({ + {1, 2, 110}, + {1, 2, 120}, + {1, 2, 130}, + }), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}) }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {2, TInstant::MicroSeconds(1000)}, {3, TInstant::MicroSeconds(2000)} }; - TestWatermarksImpl(input, expected, yield_pattern); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 3; + expectedStatsMap["MultiHop_LateThrownEventsCount"] = 3; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 1; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap); } Y_UNIT_TEST(TestWatermarkFlow1) { @@ -344,7 +397,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 300, 5}, {1, 400, 6} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -356,11 +408,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({}) }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {0, TInstant::MicroSeconds(100)}, {3, TInstant::MicroSeconds(200)} }; - TestWatermarksImpl(input, expected, yield_pattern); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 3; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap); } Y_UNIT_TEST(TestWatermarkFlow2) { @@ -372,7 +428,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 107, 5}, {1, 106, 6} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -381,12 +436,20 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), - TOutputGroup({{1, 4, 90}, {1, 4, 100}, {1, 4, 110}}) + TOutputGroup({ + {1, 4, 90}, + {1, 4, 100}, + {1, 4, 110}, + }) }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {0, TInstant::MicroSeconds(76)}, }; - TestWatermarksImpl(input, expected, yield_pattern); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 3; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 4; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap); } Y_UNIT_TEST(TestWatermarkFlow3) { @@ -398,7 +461,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 107, 5}, {1, 106, 6} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -407,12 +469,21 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), - TOutputGroup({{1, 4, 90}, {1, 9, 100}, {1, 9, 110}, {1, 5, 120}}) + TOutputGroup({ + {1, 4, 90}, + {1, 9, 100}, + {1, 9, 110}, + {1, 5, 120}, + }) }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {0, TInstant::MicroSeconds(76)}, }; - TestWatermarksImpl(input, expected, yield_pattern); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 4; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 2; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap); } Y_UNIT_TEST(TestWatermarkFlowOverflow) { @@ -437,7 +508,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {1, 121, 16}, {1, 126, 17}, }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -463,14 +533,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({{1, 65, 120}}), TOutputGroup({{1, 17, 220}, {1, 32, 210}, {1, 46, 130}, {1, 46, 140}, {1, 46, 150}, {1, 46, 160}, {1, 46, 170}, {1, 46, 180}, {1, 46, 190}, {1, 46, 200}}), }; - std::vector<std::pair<ui64, TInstant>> yield_pattern = { + const std::vector<std::pair<ui64, TInstant>> watermarks = { {9, TInstant::MicroSeconds(1)}, {12, TInstant::MicroSeconds(2)}, {14, TInstant::MicroSeconds(50)}, {15, TInstant::MicroSeconds(110)}, {16, TInstant::MicroSeconds(120)}, }; - TestWatermarksImpl(input, expected, yield_pattern, 10, 100, 20); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 13; + expectedStatsMap["MultiHop_EarlyThrownEventsCount"] = 1; + + TestWatermarksImpl(input, expected, watermarks, expectedStatsMap, 10, 100, 20); } Y_UNIT_TEST(TestDataWatermarks) { @@ -491,7 +565,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}), TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}), }; - TestImpl(input, expected, true); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 12; + + TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true); } Y_UNIT_TEST(TestDataWatermarksNoGarbage) { @@ -506,22 +583,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({{1, 2, 110}, {1, 2, 120}, {1, 2, 130}}), TOutputGroup({{2, 1, 160}, {2, 1, 170}, {2, 1, 180}}), }; - TestImpl(input, expected, true, 10, 30, 20, - [](ui32 curGroup, TSetup<false>& setup) { - if (curGroup != 2) { - return; - } + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 6; - setup.StatsRegistry->ForEachStat([](const TStatKey& key, i64 value) { - if (key.GetName() == "MultiHop_KeysCount") { - UNIT_ASSERT_EQUAL_C(value, 1, "actual: " << value << " expected: " << 1); - } - }); - }); + TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true, false); } Y_UNIT_TEST(TestValidness1) { - const std::vector<TInputItem> input1 = { + const std::vector<TInputItem> input = { // Group; Time; Value {1, 101, 2}, {2, 101, 2}, @@ -529,7 +598,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { {2, 140, 5}, {2, 160, 1} }; - const std::vector<TOutputGroup> expected = { TOutputGroup({}), TOutputGroup({}), @@ -540,7 +608,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}), }; - TestImpl(input1, expected, false); + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 12; + expectedStatsMap["MultiHop_KeysCount"] = 2; + + TestImpl(input, expected, expectedStatsMap); } Y_UNIT_TEST(TestValidness2) { @@ -567,8 +639,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170}, {2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}), }; + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 22; + expectedStatsMap["MultiHop_KeysCount"] = 2; - TestImpl(input, expected, true); + TestImpl(input, expected, expectedStatsMap, 10, 30, 20, true); } Y_UNIT_TEST(TestValidness3) { @@ -588,8 +663,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}}) }; + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 12; + expectedStatsMap["MultiHop_KeysCount"] = 2; - TestImpl(input, expected, true, 5, 10, 10); + TestImpl(input, expected, expectedStatsMap, 5, 10, 10, true); } Y_UNIT_TEST(TestDelay) { @@ -603,8 +681,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({}), TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}}) }; + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 5; + expectedStatsMap["MultiHop_LateThrownEventsCount"] = 1; - TestImpl(input, expected, false); + TestImpl(input, expected, expectedStatsMap); } Y_UNIT_TEST(TestWindowsBeforeFirstElement) { @@ -618,8 +699,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}}) }; + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 4; - TestImpl(input, expected, false); + TestImpl(input, expected, expectedStatsMap); } Y_UNIT_TEST(TestSubzeroValues) { @@ -632,10 +715,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { TOutputGroup({}), TOutputGroup({{1, 2, 30}}), }; + auto expectedStatsMap = DefaultStatsMap; + expectedStatsMap["MultiHop_NewHopsCount"] = 1; - TestImpl(input, expected, false); + TestImpl(input, expected, expectedStatsMap); } } -} // namespace NMiniKQL -} // namespace NKikimr +} // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/mkql_watermark.h b/yql/essentials/minikql/mkql_watermark.h index 8a5805bd536..5370247d26e 100644 --- a/yql/essentials/minikql/mkql_watermark.h +++ b/yql/essentials/minikql/mkql_watermark.h @@ -1,12 +1,12 @@ #pragma once -#include "util/datetime/base.h" +#include <util/datetime/base.h> +#include <util/generic/maybe.h> -namespace NKikimr { -namespace NMiniKQL { +namespace NKikimr::NMiniKQL { struct TWatermark { - TInstant WatermarkIn; + TMaybe<TInstant> WatermarkIn; }; -}} +} // namespace NKikimr::NMiniKQL |