summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <[email protected]>2022-07-29 18:43:49 +0300
committerbbiff <[email protected]>2022-07-29 18:43:49 +0300
commit9e1df78041fdd7052eedd4904110ddaee37b8510 (patch)
tree3339e7c9d7335314c2e4bcb3747c5ce9f684c589
parent085f619b2ebbeec6959b10edb5ecf48cad2b27a3 (diff)
watermarks
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp21
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp93
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h7
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp10
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp226
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp2
7 files changed, 312 insertions, 49 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 13a79ebb06d..1b50ca14a38 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -1,5 +1,7 @@
#include "dq_tasks_runner.h"
+#include <ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h>
+
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/dq/runtime/dq_columns_resolve.h>
#include <ydb/library/yql/dq/runtime/dq_input_channel.h>
@@ -20,6 +22,7 @@
#include <util/generic/scope.h>
+
using namespace NKikimr;
using namespace NKikimr::NMiniKQL;
using namespace NYql::NDqProto;
@@ -348,7 +351,18 @@ public:
auto& compPatternAlloc = compPattern ? compPattern->Alloc.Ref() : Alloc().Ref();
auto& compPatternEnv = compPattern ? compPattern->Env : typeEnv;
- TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, Context.ComputationFactory,
+ auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ auto& computationFactory = Context.ComputationFactory;
+ if (auto res = computationFactory(callable, ctx)) {
+ return res;
+ }
+ if (callable.GetType()->GetName() == "MultiHoppingCore") {
+ return WrapMultiHoppingCore(callable, ctx, Watermark);
+ }
+ return nullptr;
+ };
+
+ TComputationPatternOpts opts(compPatternAlloc, compPatternEnv, taskRunnerFactory,
Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
ProgramParsed.StatsRegistry.Get());
@@ -651,6 +665,10 @@ public:
return TaskHasEffects;
}
+ void SetWatermark(TInstant time) {
+ Watermark.WatermarkIn = std::move(time);
+ }
+
IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override {
auto ptr = InputChannels.FindPtr(channelId);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId);
@@ -837,6 +855,7 @@ private:
THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform
IDqOutputConsumer::TPtr Output;
NUdf::TUnboxedValue ResultStream;
+ NKikimr::NMiniKQL::TWatermark Watermark;
bool TaskHasEffects = false;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index 7c1696c290a..1c8a12c5bac 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -52,7 +52,6 @@
#include "mkql_map.h"
#include "mkql_mapnext.h"
#include "mkql_map_join.h"
-#include "mkql_multihopping.h"
#include "mkql_multimap.h"
#include "mkql_next_value.h"
#include "mkql_now.h"
@@ -224,7 +223,6 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"CombineCore", &WrapCombineCore},
{"GroupingCore", &WrapGroupingCore},
{"HoppingCore", &WrapHoppingCore},
- {"MultiHoppingCore", &WrapMultiHoppingCore},
{"ToBytes", &WrapToBytes},
{"FromBytes", &WrapFromBytes},
{"NewMTRand", &WrapNewMTRand},
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
index 6c41b477cd4..585507da050 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
@@ -16,10 +16,12 @@ namespace NMiniKQL {
namespace {
const TStatKey Hop_NewHopsCount("MultiHop_NewHopsCount", true);
-const TStatKey Hop_ThrownEventsCount("MultiHop_ThrownEventsCount", true);
+const TStatKey Hop_EarlyThrownEventsCount("MultiHop_EarlyThrownEventsCount", true);
+const TStatKey Hop_LateThrownEventsCount("MultiHop_LateThrownEventsCount", true);
const TStatKey Hop_EmptyTimeCount("MultiHop_EmptyTimeCount", true);
const TStatKey Hop_KeysCount("MultiHop_KeysCount", true);
+
constexpr ui32 StateVersion = 1;
using TEqualsFunc = std::function<bool(NUdf::TUnboxedValuePod, NUdf::TUnboxedValuePod)>;
@@ -44,7 +46,9 @@ public:
bool dataWatermarks,
TComputationContext& ctx,
const THashFunc& hash,
- const TEqualsFunc& equal)
+ const TEqualsFunc& equal,
+ TWatermark& watermark,
+ bool watermarkMode)
: TBase(memInfo)
, Stream(std::move(stream))
, Self(self)
@@ -53,9 +57,11 @@ public:
, DelayHopCount(delayHopCount)
, StatesMap(0, hash, equal)
, Ctx(ctx)
+ , Watermark(watermark)
+ , WatermarkMode(watermarkMode)
{
if (dataWatermarks) {
- WatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime));
+ DataWatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime));
}
}
@@ -154,22 +160,33 @@ public:
}
}
+ TInstant GetWatermark() {
+ return Watermark.WatermarkIn;
+ }
+
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
if (!Ready.empty()) {
result = std::move(Ready.front());
Ready.pop_front();
return NUdf::EFetchStatus::Ok;
}
+ if (PendingYield) {
+ PendingYield = false;
+ return NUdf::EFetchStatus::Yield;
+ }
if (Finished) {
return NUdf::EFetchStatus::Finish;
}
- i64 thrownEventsStat = 0;
+ i64 EarlyEventsThrown = 0;
+ i64 LateEventsThrown = 0;
i64 newHopsStat = 0;
i64 emptyTimeCtStat = 0;
+
Y_DEFER {
- MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEventsStat);
+ MKQL_ADD_STAT(Ctx.Stats, Hop_EarlyThrownEventsCount, EarlyEventsThrown);
+ MKQL_ADD_STAT(Ctx.Stats, Hop_LateThrownEventsCount, LateEventsThrown);
MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat);
MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat);
};
@@ -191,6 +208,19 @@ public:
Ready.pop_front();
return NUdf::EFetchStatus::Ok;
}
+ } else if (status == NUdf::EFetchStatus::Yield) {
+ if (!WatermarkMode) {
+ return status;
+ }
+ PendingYield = true;
+ CloseOldBuckets(GetWatermark().MicroSeconds(), newHopsStat);
+ if (!Ready.empty()) {
+ result = std::move(Ready.front());
+ Ready.pop_front();
+ return NUdf::EFetchStatus::Ok;
+ }
+ PendingYield = false;
+ return NUdf::EFetchStatus::Yield;
}
return status;
}
@@ -205,16 +235,22 @@ public:
const auto ts = time.Get<ui64>();
const auto hopIndex = ts / HopTime;
- auto& keyState = GetOrCreateKeyState(key, hopIndex);
+ auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex);
if (hopIndex < keyState.HopIndex) {
- ++thrownEventsStat;
+ ++EarlyEventsThrown;
+ continue;
+ }
+ if (WatermarkMode && (hopIndex >= keyState.HopIndex + DelayHopCount + IntervalHopCount)) {
+ ++LateEventsThrown;
continue;
}
- // Overflow is not possible, because of hopIndex is a product of a division
- auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0);
- CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat);
+ // Overflow is not possible, because hopIndex is a product of a division
+ if (!WatermarkMode) {
+ auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0);
+ CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat);
+ }
auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
if (!bucket.HasValue) {
@@ -226,9 +262,9 @@ public:
bucket.Value = Self->OutUpdate->GetValue(Ctx);
}
- if (WatermarkTracker) {
- const auto newWatermark = WatermarkTracker->HandleNextEventTime(ts);
- if (newWatermark) {
+ if (DataWatermarkTracker) {
+ const auto newWatermark = DataWatermarkTracker->HandleNextEventTime(ts);
+ if (newWatermark && !WatermarkMode) {
CloseOldBuckets(*newWatermark, newHopsStat);
}
}
@@ -237,12 +273,13 @@ public:
}
TKeyState& GetOrCreateKeyState(NUdf::TUnboxedValue& key, ui64 hopIndex) {
+ i64 keyHopIndex = Max<i64>(hopIndex + 1 - IntervalHopCount, 0);
+ // For first element we shouldn't forget windows in the past
+ // Overflow is not possible, because hopIndex is a product of a division
const auto iter = StatesMap.try_emplace(
key,
IntervalHopCount + DelayHopCount,
- Max<i64>(hopIndex + 1 - IntervalHopCount, 0)
- // For first element we shouldn't forget windows in the past
- // Overflow is not possible, because of hopIndex is a product of a division
+ keyHopIndex
);
if (iter.second) {
key.Ref();
@@ -329,7 +366,8 @@ public:
const auto watermarkIndex = watermarkTs / HopTime;
EraseNodesIf(StatesMap, [&](auto& iter) {
auto& [key, val] = iter;
- const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex + 1 - IntervalHopCount, newHops);
+ ui64 closeBeforeIndex = watermarkIndex + 1 - IntervalHopCount;
+ const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, closeBeforeIndex, newHops);
if (keyStateBecameEmpty) {
key.UnRef();
}
@@ -352,6 +390,9 @@ public:
const ui64 HopTime;
const ui64 IntervalHopCount;
const ui64 DelayHopCount;
+ TWatermark& Watermark;
+ bool WatermarkMode;
+ bool PendingYield = false;
using TStatesMap = std::unordered_map<
NUdf::TUnboxedValuePod, TKeyState,
@@ -363,7 +404,7 @@ public:
bool Finished = false;
TComputationContext& Ctx;
- std::optional<TWatermarkTracker> WatermarkTracker;
+ std::optional<TWatermarkTracker> DataWatermarkTracker;
};
TMultiHoppingCoreWrapper(
@@ -389,7 +430,9 @@ public:
IComputationNode* delay,
IComputationNode* dataWatermarks,
TType* keyType,
- TType* stateType)
+ TType* stateType,
+ TWatermark& watermark,
+ bool watermarkMode)
: TBaseComputation(mutables)
, Stream(stream)
, Item(item)
@@ -418,9 +461,10 @@ public:
, KeyTypes()
, IsTuple(false)
, UseIHash(false)
+ , Watermark(watermark)
+ , WatermarkMode(watermarkMode)
{
Stateless = false;
-
bool encoded;
GetDictionaryKeyTypes(keyType, KeyTypes, IsTuple, encoded, UseIHash);
Y_VERIFY(!encoded, "TODO");
@@ -447,7 +491,8 @@ public:
(ui64)intervalHopCount, (ui64)delayHopCount,
dataWatermarks, ctx,
TValueHasher(KeyTypes, IsTuple, UseIHash ? MakeHashImpl(KeyType) : nullptr),
- TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr));
+ TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr),
+ Watermark, WatermarkMode);
}
NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
@@ -521,11 +566,13 @@ private:
TKeyTypes KeyTypes;
bool IsTuple;
bool UseIHash;
+ TWatermark& Watermark;
+ bool WatermarkMode;
};
}
-IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode) {
MKQL_ENSURE(callable.GetInputsCount() == 20, "Expected 20 args");
auto hasSaveLoad = !callable.GetInput(12).GetStaticType()->IsVoid();
@@ -573,7 +620,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo
return new TMultiHoppingCoreWrapper(ctx.Mutables,
stream, item, key, state, state2, time, inSave, inLoad, keyExtract,
outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish,
- hop, interval, delay, dataWatermarks, keyType, stateType);
+ hop, interval, delay, dataWatermarks, keyType, stateType, watermark, watermarkMode);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
index 457b2ebe55a..02286aab567 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
@@ -5,7 +5,12 @@
namespace NKikimr {
namespace NMiniKQL {
-IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+struct TWatermark {
+ TInstant WatermarkIn;
+};
+
+IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode = false);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
index 6819dc77187..8bc4a67c2cf 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
@@ -1,3 +1,4 @@
+#include "mkql_multihopping.h"
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
@@ -22,10 +23,12 @@ namespace {
return CreateDeterministicTimeProvider(10000000);
}
- TComputationNodeFactory GetAuxCallableFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) {
+ return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
if (callable.GetType()->GetName() == "OneYieldStream") {
return new TExternalComputationNode(ctx.Mutables);
+ } else if (callable.GetType()->GetName() == "MultiHoppingCore") {
+ return WrapMultiHoppingCore(callable, ctx, watermark);
}
return GetBuiltinFactory()(callable, ctx);
@@ -46,7 +49,7 @@ namespace {
THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
Explorer.Walk(pgm.GetNode(), *Env);
- TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(),
+ TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark),
FunctionRegistry.Get(),
NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi);
Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
@@ -64,6 +67,7 @@ namespace {
TExploringNodeVisitor Explorer;
IComputationPattern::TPtr Pattern;
+ TWatermark Watermark;
};
struct TStreamWithYield : public NUdf::TBoxedValue {
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
index 761c7b6a349..27517269fe4 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
@@ -1,3 +1,4 @@
+#include "mkql_multihopping.h"
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
@@ -38,11 +39,10 @@ namespace {
};
std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) {
- auto res = vec;
- std::sort(res.begin(), res.end(), [](auto l, auto r) {
+ std::sort(vec.begin(), vec.end(), [](auto l, auto r) {
return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time);
});
- return res;
+ return vec;
}
IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) {
@@ -64,10 +64,12 @@ namespace {
return CreateDeterministicTimeProvider(10000000);
}
- TComputationNodeFactory GetAuxCallableFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark, bool watermarkMode = false) {
+ return [&watermark, watermarkMode](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
if (callable.GetType()->GetName() == "MyStream") {
return new TExternalComputationNode(ctx.Mutables);
+ } else if (callable.GetType()->GetName() == "MultiHoppingCore") {
+ return WrapMultiHoppingCore(callable, ctx, watermark, watermarkMode);
}
return GetBuiltinFactory()(callable, ctx);
@@ -75,12 +77,14 @@ namespace {
}
struct TSetup {
- TSetup(TScopedAlloc& alloc)
+ TSetup(TScopedAlloc& alloc, TWatermark& watermark, bool watermarkMode = false)
: FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()))
, RandomProvider(CreateRandomProvider())
, TimeProvider(CreateTimeProvider())
, StatsResgistry(CreateDefaultStatsRegistry())
, Alloc(alloc)
+ , Watermark(watermark)
+ , WatermarkMode(watermarkMode)
{
Env.Reset(new TTypeEnvironment(Alloc));
PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
@@ -88,7 +92,7 @@ namespace {
THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
Explorer.Walk(pgm.GetNode(), *Env);
- TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(),
+ TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark, WatermarkMode),
FunctionRegistry.Get(),
NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi,
StatsResgistry.Get());
@@ -108,20 +112,28 @@ namespace {
TExploringNodeVisitor Explorer;
IComputationPattern::TPtr Pattern;
+
+ TWatermark& Watermark;
+ bool WatermarkMode;
};
struct TStream : public NUdf::TBoxedValue {
- TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback)
+ TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield)
: Items(items)
- , FetchCallback(fetchCallback) {}
+ , FetchCallback(fetchCallback)
+ , yield(yield) {}
private:
TUnboxedValueVector Items;
ui32 Index = 0;
std::function<void()> FetchCallback;
+ bool* yield;
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
FetchCallback();
+ if (*yield) {
+ return NUdf::EFetchStatus::Yield;
+ }
if (Index >= Items.size()) {
return NUdf::EFetchStatus::Finish;
}
@@ -135,6 +147,7 @@ namespace {
const std::vector<TInputItem> items,
std::function<void()> fetchCallback,
bool dataWatermarks,
+ bool* yield,
ui64 hop = 10,
ui64 interval = 30,
ui64 delay = 20)
@@ -219,7 +232,7 @@ namespace {
streamItems.push_back(std::move(structValues));
}
- auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback));
+ auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback, yield));
graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
return graph;
}
@@ -227,22 +240,38 @@ namespace {
Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
void TestImpl(
- const std::vector<TInputItem> input,
- const std::vector<TOutputGroup> expected,
+ const std::vector<TInputItem>& input,
+ const std::vector<TOutputGroup>& expected,
bool dataWatermarks,
ui64 hop = 10,
ui64 interval = 30,
ui64 delay = 20,
- std::function<void(ui32, TSetup&)> customCheck = [](ui32, TSetup&){})
+ std::function<void(ui32, TSetup&)> customCheck = [](ui32, TSetup&){},
+ TWatermark* watermark = nullptr,
+ bool* yield = nullptr,
+ std::function<void()> fetch_callback= [](){},
+ bool watermarkMode = false)
{
+ bool yield_clone = false;
+ if (!yield) {
+ yield = &yield_clone;
+ }
+ if (watermarkMode) {
+ dataWatermarks = false;
+ }
+ TWatermark watermark_clone{TInstant::Zero()};
+ if (watermark == nullptr) {
+ watermark = &watermark_clone;
+ }
TScopedAlloc alloc;
- TSetup setup1(alloc);
+ TSetup setup1(alloc, *watermark, watermarkMode);
ui32 curGroupId = 0;
std::vector<TOutputItem> curResult;
- auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1]() {
- auto expectedItems = Ordered(expected.at(curGroupId).Items);
+ auto check = [&curResult, &curGroupId, &expected, customCheck, &setup1, &fetch_callback]() {
+ fetch_callback();
+ auto expectedItems = Ordered(expected.at(curGroupId).Items); // Add more empty lists at yield in expected
curResult = Ordered(curResult);
UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems);
customCheck(curGroupId, setup1);
@@ -250,12 +279,12 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
curResult.clear();
};
- auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, hop, interval, delay);
+ auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, yield, hop, interval, delay);
auto root1 = graph1->GetValue();
NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
- while (status == NUdf::EFetchStatus::Ok) {
+ while (status == NUdf::EFetchStatus::Ok || status == NUdf::EFetchStatus::Yield) {
NUdf::TUnboxedValue val;
status = root1.Fetch(val);
if (status == NUdf::EFetchStatus::Ok) {
@@ -267,6 +296,167 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size());
}
+ void TestWatermarksImpl(
+ const std::vector<TInputItem>& input,
+ const std::vector<TOutputGroup>& expected,
+ const std::vector<std::pair<ui64, TInstant>>& watermarks)
+ {
+ bool yield = false;
+ TWatermark watermark;
+ ui64 inp_index = 0;
+ ui64 pattern_index = 0;
+ auto avant_fetch = [&yield, &watermark, &watermarks, &inp_index, &pattern_index](){
+ yield = false;
+ if (pattern_index >= watermarks.size()) {
+ return;
+ }
+ if (inp_index == watermarks[pattern_index].first) {
+ yield = true;
+ watermark.WatermarkIn = watermarks[pattern_index].second;
+ ++pattern_index;
+ } else {
+ ++inp_index;
+ }
+ };
+ TestImpl(input, expected, false, 10, 30, 20, [](ui32, TSetup&){}, &watermark, &yield, avant_fetch, true);
+ }
+
+ Y_UNIT_TEST(TestThrowWatermarkFromPast) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 2},
+ {1, 131, 3},
+ {1, 200, 4},
+ {1, 300, 5},
+ {1, 400, 6}
+ };
+
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({})
+ };
+ std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ {2, TInstant::MicroSeconds(20)},
+ {3, TInstant::MicroSeconds(40)}
+ };
+ TestWatermarksImpl(input, expected, yield_pattern);
+ }
+
+ Y_UNIT_TEST(TestThrowWatermarkFromFuture) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 2},
+ {1, 131, 3},
+ {1, 200, 4},
+ {1, 300, 5},
+ {1, 400, 6}
+ };
+
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({})
+ };
+ std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ {2, TInstant::MicroSeconds(1000)},
+ {3, TInstant::MicroSeconds(2000)}
+ };
+ TestWatermarksImpl(input, expected, yield_pattern);
+ }
+
+ Y_UNIT_TEST(TestWatermarkFlow1) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 101, 2},
+ {1, 131, 3},
+ {1, 200, 4},
+ {1, 300, 5},
+ {1, 400, 6}
+ };
+
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 2, 110},{1, 2, 120},{1, 2, 130}}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({})
+ };
+ std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ {0, TInstant::MicroSeconds(100)},
+ {3, TInstant::MicroSeconds(200)}
+ };
+ TestWatermarksImpl(input, expected, yield_pattern);
+ }
+
+ Y_UNIT_TEST(TestWatermarkFlow2) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 100, 2},
+ {1, 105, 3},
+ {1, 80, 4},
+ {1, 107, 5},
+ {1, 106, 6}
+ };
+
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 4, 90}, {1, 4, 100}, {1, 4, 110}})
+ };
+ std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ {0, TInstant::MicroSeconds(76)},
+ };
+ TestWatermarksImpl(input, expected, yield_pattern);
+ }
+
+ Y_UNIT_TEST(TestWatermarkFlow3) {
+ const std::vector<TInputItem> input = {
+ // Group; Time; Value
+ {1, 90, 2},
+ {1, 99, 3},
+ {1, 80, 4},
+ {1, 107, 5},
+ {1, 106, 6}
+ };
+
+ const std::vector<TOutputGroup> expected = {
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({}),
+ TOutputGroup({{1, 4, 90}, {1, 9, 100}, {1, 9, 110}, {1, 5, 120}})
+ };
+ std::vector<std::pair<ui64, TInstant>> yield_pattern = {
+ {0, TInstant::MicroSeconds(76)},
+ };
+ TestWatermarksImpl(input, expected, yield_pattern);
+ }
+
Y_UNIT_TEST(TestDataWatermarks) {
const std::vector<TInputItem> input = {
// Group; Time; Value
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index ef399d1bed6..294d94920ab 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -248,7 +248,7 @@ private:
if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records";
else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks
isDeriv = true;
- } else if (name == "MultiHop_ThrownEventsCount") {
+ } else if (name == "MultiHop_LateThrownEventsCount") {
publicCounterName = "query.late_events";
isDeriv = true;
}