diff options
| author | bbiff <[email protected]> | 2022-07-29 18:43:49 +0300 |
|---|---|---|
| committer | bbiff <[email protected]> | 2022-07-29 18:43:49 +0300 |
| commit | 9e1df78041fdd7052eedd4904110ddaee37b8510 (patch) | |
| tree | 3339e7c9d7335314c2e4bcb3747c5ce9f684c589 | |
| parent | 085f619b2ebbeec6959b10edb5ecf48cad2b27a3 (diff) | |
watermarks
7 files changed, 312 insertions, 49 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 13a79ebb06d..1b50ca14a38 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -1,5 +1,7 @@ #include "dq_tasks_runner.h" +#include <ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h> + #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/runtime/dq_columns_resolve.h> #include <ydb/library/yql/dq/runtime/dq_input_channel.h> @@ -20,6 +22,7 @@ #include <util/generic/scope.h> + using namespace NKikimr; using namespace NKikimr::NMiniKQL; using namespace NYql::NDqProto; @@ -348,7 +351,18 @@ public: auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref(); auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv; - TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, Context.ComputationFactory, + auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + auto& computationFactory = Context.ComputationFactory; + if (auto res = computationFactory(callable, ctx)) { + return res; + } + if (callable.GetType()->GetName() == "MultiHoppingCore") { + return WrapMultiHoppingCore(callable, ctx, Watermark); + } + return nullptr; + }; + + TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, taskRunnerFactory, Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi, ProgramParsed.StatsRegistry.Get()); @@ -651,6 +665,10 @@ public: return TaskHasEffects; } + void SetWatermark(TInstant time) { + Watermark.WatermarkIn = std::move(time); + } + IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override { auto ptr = InputChannels.FindPtr(channelId); YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId); @@ -837,6 +855,7 @@ private: THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform IDqOutputConsumer::TPtr Output; NUdf::TUnboxedValue ResultStream; + NKikimr::NMiniKQL::TWatermark Watermark; bool TaskHasEffects = false; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 7c1696c290a..1c8a12c5bac 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -52,7 +52,6 @@ #include "mkql_map.h" #include "mkql_mapnext.h" #include "mkql_map_join.h" -#include "mkql_multihopping.h" #include "mkql_multimap.h" #include "mkql_next_value.h" #include "mkql_now.h" @@ -224,7 +223,6 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"CombineCore", &WrapCombineCore}, {"GroupingCore", &WrapGroupingCore}, {"HoppingCore", &WrapHoppingCore}, - {"MultiHoppingCore", &WrapMultiHoppingCore}, {"ToBytes", &WrapToBytes}, {"FromBytes", &WrapFromBytes}, {"NewMTRand", &WrapNewMTRand}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp index 6c41b477cd4..585507da050 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp @@ -16,10 +16,12 @@ namespace NMiniKQL { namespace { const TStatKey Hop_NewHopsCount("MultiHop_NewHopsCount", true); -const TStatKey Hop_ThrownEventsCount("MultiHop_ThrownEventsCount", true); +const TStatKey Hop_EarlyThrownEventsCount("MultiHop_EarlyThrownEventsCount", true); +const TStatKey Hop_LateThrownEventsCount("MultiHop_LateThrownEventsCount", true); const TStatKey Hop_EmptyTimeCount("MultiHop_EmptyTimeCount", true); const TStatKey Hop_KeysCount("MultiHop_KeysCount", true); + constexpr ui32 StateVersion = 1; using TEqualsFunc = std::function<bool(NUdf::TUnboxedValuePod, NUdf::TUnboxedValuePod)>; @@ -44,7 +46,9 @@ public: bool dataWatermarks, TComputationContext& ctx, const THashFunc& hash, - const TEqualsFunc& equal) + const TEqualsFunc& equal, + TWatermark& watermark, + bool watermarkMode) : TBase(memInfo) , Stream(std::move(stream)) , Self(self) @@ -53,9 +57,11 @@ public: , DelayHopCount(delayHopCount) , StatesMap(0, hash, equal) , Ctx(ctx) + , Watermark(watermark) + , WatermarkMode(watermarkMode) { if (dataWatermarks) { - WatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime)); + DataWatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime)); } } @@ -154,22 +160,33 @@ public: } } + TInstant GetWatermark() { + return Watermark.WatermarkIn; + } + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { if (!Ready.empty()) { result = std::move(Ready.front()); Ready.pop_front(); return NUdf::EFetchStatus::Ok; } + if (PendingYield) { + PendingYield = false; + return NUdf::EFetchStatus::Yield; + } if (Finished) { return NUdf::EFetchStatus::Finish; } - i64 thrownEventsStat = 0; + i64 EarlyEventsThrown = 0; + i64 LateEventsThrown = 0; i64 newHopsStat = 0; i64 emptyTimeCtStat = 0; + Y_DEFER { - MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEventsStat); + MKQL_ADD_STAT(Ctx.Stats, Hop_EarlyThrownEventsCount, EarlyEventsThrown); + MKQL_ADD_STAT(Ctx.Stats, Hop_LateThrownEventsCount, LateEventsThrown); MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat); MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat); }; @@ -191,6 +208,19 @@ public: Ready.pop_front(); return NUdf::EFetchStatus::Ok; } + } else if (status == NUdf::EFetchStatus::Yield) { + if (!WatermarkMode) { + return status; + } + PendingYield = true; + CloseOldBuckets(GetWatermark().MicroSeconds(), newHopsStat); + if (!Ready.empty()) { + result = std::move(Ready.front()); + Ready.pop_front(); + return NUdf::EFetchStatus::Ok; + } + PendingYield = false; + return NUdf::EFetchStatus::Yield; } return status; } @@ -205,16 +235,22 @@ public: const auto ts = time.Get<ui64>(); const auto hopIndex = ts / HopTime; - auto& keyState = GetOrCreateKeyState(key, hopIndex); + auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex); if (hopIndex < keyState.HopIndex) { - ++thrownEventsStat; + ++EarlyEventsThrown; + continue; + } + if (WatermarkMode && (hopIndex >= keyState.HopIndex + DelayHopCount + IntervalHopCount)) { + ++LateEventsThrown; continue; } - // Overflow is not possible, because of hopIndex is a product of a division - auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0); - CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat); + // Overflow is not possible, because hopIndex is a product of a division + if (!WatermarkMode) { + auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0); + CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat); + } auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()]; if (!bucket.HasValue) { @@ -226,9 +262,9 @@ public: bucket.Value = Self->OutUpdate->GetValue(Ctx); } - if (WatermarkTracker) { - const auto newWatermark = WatermarkTracker->HandleNextEventTime(ts); - if (newWatermark) { + if (DataWatermarkTracker) { + const auto newWatermark = DataWatermarkTracker->HandleNextEventTime(ts); + if (newWatermark && !WatermarkMode) { CloseOldBuckets(*newWatermark, newHopsStat); } } @@ -237,12 +273,13 @@ public: } TKeyState& GetOrCreateKeyState(NUdf::TUnboxedValue& key, ui64 hopIndex) { + i64 keyHopIndex = Max<i64>(hopIndex + 1 - IntervalHopCount, 0); + // For first element we shouldn't forget windows in the past + // Overflow is not possible, because hopIndex is a product of a division const auto iter = StatesMap.try_emplace( key, IntervalHopCount + DelayHopCount, - Max<i64>(hopIndex + 1 - IntervalHopCount, 0) - // For first element we shouldn't forget windows in the past - // Overflow is not possible, because of hopIndex is a product of a division + keyHopIndex ); if (iter.second) { key.Ref(); @@ -329,7 +366,8 @@ public: const auto watermarkIndex = watermarkTs / HopTime; EraseNodesIf(StatesMap, [&](auto& iter) { auto& [key, val] = iter; - const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex + 1 - IntervalHopCount, newHops); + ui64 closeBeforeIndex = watermarkIndex + 1 - IntervalHopCount; + const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, closeBeforeIndex, newHops); if (keyStateBecameEmpty) { key.UnRef(); } @@ -352,6 +390,9 @@ public: const ui64 HopTime; const ui64 IntervalHopCount; const ui64 DelayHopCount; + TWatermark& Watermark; + bool WatermarkMode; + bool PendingYield = false; using TStatesMap = std::unordered_map< NUdf::TUnboxedValuePod, TKeyState, @@ -363,7 +404,7 @@ public: bool Finished = false; TComputationContext& Ctx; - std::optional<TWatermarkTracker> WatermarkTracker; + std::optional<TWatermarkTracker> DataWatermarkTracker; }; TMultiHoppingCoreWrapper( @@ -389,7 +430,9 @@ public: IComputationNode* delay, IComputationNode* dataWatermarks, TType* keyType, - TType* stateType) + TType* stateType, + TWatermark& watermark, + bool watermarkMode) : TBaseComputation(mutables) , Stream(stream) , Item(item) @@ -418,9 +461,10 @@ public: , KeyTypes() , IsTuple(false) , UseIHash(false) + , Watermark(watermark) + , WatermarkMode(watermarkMode) { Stateless = false; - bool encoded; GetDictionaryKeyTypes(keyType, KeyTypes, IsTuple, encoded, UseIHash); Y_VERIFY(!encoded, "TODO"); @@ -447,7 +491,8 @@ public: (ui64)intervalHopCount, (ui64)delayHopCount, dataWatermarks, ctx, TValueHasher(KeyTypes, IsTuple, UseIHash ? MakeHashImpl(KeyType) : nullptr), - TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr)); + TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr), + Watermark, WatermarkMode); } NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override { @@ -521,11 +566,13 @@ private: TKeyTypes KeyTypes; bool IsTuple; bool UseIHash; + TWatermark& Watermark; + bool WatermarkMode; }; } -IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { +IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode) { MKQL_ENSURE(callable.GetInputsCount() == 20, "Expected 20 args"); auto hasSaveLoad = !callable.GetInput(12).GetStaticType()->IsVoid(); @@ -573,7 +620,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo return new TMultiHoppingCoreWrapper(ctx.Mutables, stream, item, key, state, state2, time, inSave, inLoad, keyExtract, outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish, - hop, interval, delay, dataWatermarks, keyType, stateType); + hop, interval, delay, dataWatermarks, keyType, stateType, watermark, watermarkMode); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h index 457b2ebe55a..02286aab567 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h @@ -5,7 +5,12 @@ namespace NKikimr { namespace NMiniKQL { -IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +struct TWatermark { + TInstant WatermarkIn; +}; + +IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode = false); } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp index 6819dc77187..8bc4a67c2cf 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp @@ -1,3 +1,4 @@ +#include "mkql_multihopping.h" #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> @@ -22,10 +23,12 @@ namespace { return CreateDeterministicTimeProvider(10000000); } - TComputationNodeFactory GetAuxCallableFactory() { - return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) { + return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "OneYieldStream") { return new TExternalComputationNode(ctx.Mutables); + } else if (callable.GetType()->GetName() == "MultiHoppingCore") { + return WrapMultiHoppingCore(callable, ctx, watermark); } return GetBuiltinFactory()(callable, ctx); @@ -46,7 +49,7 @@ namespace { THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { Explorer.Walk(pgm.GetNode(), *Env); - TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), + TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark), FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); @@ -64,6 +67,7 @@ namespace { TExploringNodeVisitor Explorer; IComputationPattern::TPtr Pattern; + TWatermark Watermark; }; struct TStreamWithYield : public NUdf::TBoxedValue { diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp index 761c7b6a349..27517269fe4 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp @@ -1,3 +1,4 @@ +#include "mkql_multihopping.h" #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> @@ -38,11 +39,10 @@ namespace { }; std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) { - auto res = vec; - std::sort(res.begin(), res.end(), [](auto l, auto r) { + std::sort(vec.begin(), vec.end(), [](auto l, auto r) { return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time); }); - return res; + return vec; } IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) { @@ -64,10 +64,12 @@ namespace { return CreateDeterministicTimeProvider(10000000); } - TComputationNodeFactory GetAuxCallableFactory() { - return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark, bool watermarkMode = false) { + return [&watermark, watermarkMode](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "MyStream") { return new TExternalComputationNode(ctx.Mutables); + } else if (callable.GetType()->GetName() == "MultiHoppingCore") { + return WrapMultiHoppingCore(callable, ctx, watermark, watermarkMode); } return GetBuiltinFactory()(callable, ctx); @@ -75,12 +77,14 @@ namespace { } struct TSetup { - TSetup(TScopedAlloc& alloc) + TSetup(TScopedAlloc& alloc, TWatermark& watermark, bool watermarkMode = false) : FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry())) , RandomProvider(CreateRandomProvider()) , TimeProvider(CreateTimeProvider()) , StatsResgistry(CreateDefaultStatsRegistry()) , Alloc(alloc) + , Watermark(watermark) + , WatermarkMode(watermarkMode) { Env.Reset(new TTypeEnvironment(Alloc)); PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); @@ -88,7 +92,7 @@ namespace { THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { Explorer.Walk(pgm.GetNode(), *Env); - TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), + TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark, WatermarkMode), FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi, StatsResgistry.Get()); @@ -108,20 +112,28 @@ namespace { TExploringNodeVisitor Explorer; IComputationPattern::TPtr Pattern; + + TWatermark& Watermark; + bool WatermarkMode; }; struct TStream : public NUdf::TBoxedValue { - TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback) + TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield) : Items(items) - , FetchCallback(fetchCallback) {} + , FetchCallback(fetchCallback) + , yield(yield) {} private: TUnboxedValueVector Items; ui32 Index = 0; std::function<void()> FetchCallback; + bool* yield; NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { FetchCallback(); + if (*yield) { + return NUdf::EFetchStatus::Yield; + } if (Index >= Items.size()) { return NUdf::EFetchStatus::Finish; } @@ -135,6 +147,7 @@ namespace { const std::vector<TInputItem> items, std::function<void()> fetchCallback, bool dataWatermarks, + bool* yield, ui64 hop = 10, ui64 interval = 30, ui64 delay = 20) @@ -219,7 +232,7 @@ namespace { streamItems.push_back(std::move(structValues)); } - auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback)); + auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback, yield)); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); return graph; } @@ -227,22 +240,38 @@ namespace { Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { void TestImpl( - const std::vector<TInputItem> input, - const std::vector<TOutputGroup> expected, + const std::vector<TInputItem>& input, + const std::vector<TOutputGroup>& expected, bool dataWatermarks, ui64 hop = 10, ui64 interval = 30, ui64 delay = 20, - std::function<void(ui32, TSetup&)> customCheck = [](ui32, TSetup&){}) + std::function<void(ui32, TSetup&)> customCheck = [](ui32, TSetup&){}, + TWatermark* watermark = nullptr, + bool* yield = nullptr, + std::function<void()> fetch_callback= [](){}, + bool watermarkMode = false) { + bool yield_clone = false; + if (!yield) { + yield = &yield_clone; + } + if (watermarkMode) { + dataWatermarks = false; + } + TWatermark watermark_clone{TInstant::Zero()}; + if (watermark == nullptr) { + watermark = &watermark_clone; + } TScopedAlloc alloc; - TSetup setup1(alloc); + TSetup setup1(alloc, *watermark, watermarkMode); ui32 curGroupId = 0; std::vector<TOutputItem> curResult; - auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1]() { - auto expectedItems = Ordered(expected.at(curGroupId).Items); + auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1, &fetch_callback]() { + fetch_callback(); + auto expectedItems = Ordered(expected.at(curGroupId).Items); // Add more empty lists at yield in expected curResult = Ordered(curResult); UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems); customCheck(curGroupId, setup1); @@ -250,12 +279,12 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { curResult.clear(); }; - auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, hop, interval, delay); + auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, yield, hop, interval, delay); auto root1 = graph1->GetValue(); NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok) { + while (status == NUdf::EFetchStatus::Ok || status == NUdf::EFetchStatus::Yield) { NUdf::TUnboxedValue val; status = root1.Fetch(val); if (status == NUdf::EFetchStatus::Ok) { @@ -267,6 +296,167 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size()); } + void TestWatermarksImpl( + const std::vector<TInputItem>& input, + const std::vector<TOutputGroup>& expected, + const std::vector<std::pair<ui64, TInstant>>& watermarks) + { + bool yield = false; + TWatermark watermark; + ui64 inp_index = 0; + ui64 pattern_index = 0; + auto avant_fetch = [&yield, &watermark, &watermarks, &inp_index, &pattern_index](){ + yield = false; + if (pattern_index >= watermarks.size()) { + return; + } + if (inp_index == watermarks[pattern_index].first) { + yield = true; + watermark.WatermarkIn = watermarks[pattern_index].second; + ++pattern_index; + } else { + ++inp_index; + } + }; + TestImpl(input, expected, false, 10, 30, 20, [](ui32, TSetup&){}, &watermark, &yield, avant_fetch, true); + } + + Y_UNIT_TEST(TestThrowWatermarkFromPast) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 2}, + {1, 131, 3}, + {1, 200, 4}, + {1, 300, 5}, + {1, 400, 6} + }; + + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}) + }; + std::vector<std::pair<ui64, TInstant>> yield_pattern = { + {2, TInstant::MicroSeconds(20)}, + {3, TInstant::MicroSeconds(40)} + }; + TestWatermarksImpl(input, expected, yield_pattern); + } + + Y_UNIT_TEST(TestThrowWatermarkFromFuture) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 2}, + {1, 131, 3}, + {1, 200, 4}, + {1, 300, 5}, + {1, 400, 6} + }; + + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}) + }; + std::vector<std::pair<ui64, TInstant>> yield_pattern = { + {2, TInstant::MicroSeconds(1000)}, + {3, TInstant::MicroSeconds(2000)} + }; + TestWatermarksImpl(input, expected, yield_pattern); + } + + Y_UNIT_TEST(TestWatermarkFlow1) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 2}, + {1, 131, 3}, + {1, 200, 4}, + {1, 300, 5}, + {1, 400, 6} + }; + + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 2, 110},{1, 2, 120},{1, 2, 130}}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}) + }; + std::vector<std::pair<ui64, TInstant>> yield_pattern = { + {0, TInstant::MicroSeconds(100)}, + {3, TInstant::MicroSeconds(200)} + }; + TestWatermarksImpl(input, expected, yield_pattern); + } + + Y_UNIT_TEST(TestWatermarkFlow2) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 100, 2}, + {1, 105, 3}, + {1, 80, 4}, + {1, 107, 5}, + {1, 106, 6} + }; + + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 4, 90}, {1, 4, 100}, {1, 4, 110}}) + }; + std::vector<std::pair<ui64, TInstant>> yield_pattern = { + {0, TInstant::MicroSeconds(76)}, + }; + TestWatermarksImpl(input, expected, yield_pattern); + } + + Y_UNIT_TEST(TestWatermarkFlow3) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 90, 2}, + {1, 99, 3}, + {1, 80, 4}, + {1, 107, 5}, + {1, 106, 6} + }; + + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 4, 90}, {1, 9, 100}, {1, 9, 110}, {1, 5, 120}}) + }; + std::vector<std::pair<ui64, TInstant>> yield_pattern = { + {0, TInstant::MicroSeconds(76)}, + }; + TestWatermarksImpl(input, expected, yield_pattern); + } + Y_UNIT_TEST(TestDataWatermarks) { const std::vector<TInputItem> input = { // Group; Time; Value diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index ef399d1bed6..294d94920ab 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -248,7 +248,7 @@ private: if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records"; else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks isDeriv = true; - } else if (name == "MultiHop_ThrownEventsCount") { + } else if (name == "MultiHop_LateThrownEventsCount") { publicCounterName = "query.late_events"; isDeriv = true; } |
