summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <[email protected]>2022-11-01 12:52:05 +0300
committerd-mokhnatkin <[email protected]>2022-11-01 12:52:05 +0300
commit8049e68d98d4a7614ecb8772dd5c63b1c50df4f2 (patch)
tree957953ade699542412b7a92ebe495c8bcfb9061e
parent14b4b44f929d300374d993b6344b20a86cb5b4c5 (diff)
idle partitions implementation
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp6
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow2.cpp1
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json3
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h115
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp142
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h13
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h6
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp71
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp62
-rw-r--r--ydb/library/yql/providers/pq/common/yql_names.h3
-rw-r--r--ydb/library/yql/providers/pq/proto/dq_io.proto2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp24
-rw-r--r--ydb/library/yql/sql/v0/node.cpp3
-rw-r--r--ydb/library/yql/sql/v1/builtin.cpp3
-rw-r--r--ydb/library/yql/sql/v1/node.cpp3
17 files changed, 363 insertions, 112 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 67aadaf04de..38cc8e9eef1 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -469,7 +469,7 @@ private:
RateLimiterResourceCreatorId = Register(CreateRateLimiterResourceCreator(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName));
}
return;
- }
+ }
if (QueryStateUpdateRequest.resources().compilation() == Fq::Private::TaskResources::PREPARE) {
if (!ProgramRunnerId) {
@@ -477,7 +477,7 @@ private:
}
return;
}
-
+
if (QueryStateUpdateRequest.resources().read_rules() == Fq::Private::TaskResources::PREPARE) {
if (!ReadRulesCreatorId) {
ReadRulesCreatorId = Register(
@@ -1355,6 +1355,8 @@ private:
apply("_EnablePrecompute", "1");
apply("WatermarksMode", "disable");
apply("WatermarksGranularityMs", "1000");
+ apply("WatermarksLateArrivalDelayMs", "5000");
+ apply("WatermarksIdlePartitions", "true");
switch (Params.QueryType) {
case YandexQuery::QueryContent::STREAMING: {
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
index a85cedb647a..c3cb4f6b04e 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
@@ -1609,6 +1609,7 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
.Interval(self.Interval())
.Delay(self.Delay())
.DataWatermarks(self.DataWatermarks())
+ .Version(self.Version())
.Done().Ptr();
};
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 8edbdbd78eb..6cd1d45e502 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -585,7 +585,8 @@
{"Index": 2, "Name": "Hop", "Type": "TExprBase"},
{"Index": 3, "Name": "Interval", "Type": "TExprBase"},
{"Index": 4, "Name": "Delay", "Type": "TExprBase"},
- {"Index": 5, "Name": "DataWatermarks", "Type": "TExprBase"}
+ {"Index": 5, "Name": "DataWatermarks", "Type": "TExprBase"},
+ {"Index": 6, "Name": "Version", "Type": "TExprBase"}
]
},
{
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 0824eda8f9a..c6708b83ca2 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -6044,7 +6044,7 @@ namespace {
IGraphTransformer::TStatus HoppingTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
- if (!EnsureMinArgsCount(*input, 5, ctx.Expr) || !EnsureMaxArgsCount(*input, 6, ctx.Expr)) {
+ if (!EnsureArgsCount(*input, 7, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
if (auto status = EnsureTypeRewrite(input->HeadRef(), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
@@ -6110,14 +6110,12 @@ namespace {
return convertStatus;
}
- if (input->ChildrenSize() == 6) {
- const auto& dataWatermarksNodePtr = input->ChildRef(5);
- if (dataWatermarksNodePtr->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Unit) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(dataWatermarksNodePtr->Pos()), TStringBuilder()
- << "Expected unit type, but got: "
- << *dataWatermarksNodePtr->GetTypeAnn()));
- return IGraphTransformer::TStatus::Error;
- }
+ const auto& dataWatermarksNodePtr = input->ChildRef(5);
+ if (dataWatermarksNodePtr->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Unit) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(dataWatermarksNodePtr->Pos()), TStringBuilder()
+ << "Expected unit type, but got: "
+ << *dataWatermarksNodePtr->GetTypeAnn()));
+ return IGraphTransformer::TStatus::Error;
}
input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
diff --git a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h
index 01f8f139393..5db02088ecc 100644
--- a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h
+++ b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h
@@ -13,56 +13,118 @@ public:
TDqSourceWatermarkTracker(
TDuration granularity,
TInstant startWatermark,
- ui32 expectedPartitionsCount)
+ bool idlePartitionsEnabled,
+ TDuration lateArrivalDelay,
+ TInstant systemTime)
: Granularity(granularity)
, StartWatermark(ToDiscreteTime(startWatermark))
- , ExpectedPartitionsCount(expectedPartitionsCount) {}
+ , IdlePartitionsEnabled(idlePartitionsEnabled)
+ , LateArrivalDelay(lateArrivalDelay)
+ , LastTimeNotifiedAt(systemTime) {}
+
+ TMaybe<TInstant> NotifyNewPartitionTime(
+ const TPartitionKey& partitionKey,
+ TInstant partitionTime,
+ TInstant systemTime)
+ {
+ auto [iter, _] = Data.try_emplace(partitionKey);
+ if (UpdatePartitionTime(iter->second, partitionTime, systemTime)) {
+ return RecalcWatermark();
+ }
- TMaybe<TInstant> NotifyNewPartitionTime(const TPartitionKey& partitionKey, TInstant time) {
- auto granularPartitionTime = ToDiscreteTime(time);
+ return Nothing();
+ }
- auto iter = Data.find(partitionKey);
- if (iter == Data.end()) {
- Data[partitionKey] = granularPartitionTime;
- return RecalcWatermark();
+ TMaybe<TInstant> HandleIdleness(TInstant systemTime) {
+ if (!Watermark) {
+ return Watermark = StartWatermark;
}
- if (granularPartitionTime <= iter->second) {
+ if (!IdlePartitionsEnabled || !ShouldCheckIdlenessNow(systemTime)) {
return Nothing();
}
- iter->second = granularPartitionTime;
+ if (AllPartitionsAreIdle(systemTime)) {
+ return TryProduceFakeWatermark(systemTime);
+ }
+
return RecalcWatermark();
}
+ TMaybe<TInstant> GetNextIdlenessCheckAt(TInstant systemTime) {
+ return IdlePartitionsEnabled
+ ? ToDiscreteTime(systemTime + Granularity)
+ : TMaybe<TInstant>();
+ }
+private:
+ struct TPartitionState {
+ TInstant Time; // partition time, notified outside
+ TInstant TimeNotifiedAt; // system time when notification was received
+ TInstant Watermark;
+ };
+
private:
TInstant ToDiscreteTime(TInstant time) const {
return TInstant::MicroSeconds(time.MicroSeconds() - time.MicroSeconds() % Granularity.MicroSeconds());
}
+ bool AllPartitionsAreIdle(TInstant systemTime) const {
+ return LastTimeNotifiedAt + LateArrivalDelay <= systemTime;
+ }
+
+ bool ShouldCheckIdlenessNow(TInstant systemTime) {
+ const auto discreteSystemTime = ToDiscreteTime(systemTime);
+ if (discreteSystemTime < NextIdlenessCheckAt) {
+ return false;
+ }
+
+ NextIdlenessCheckAt = discreteSystemTime + Granularity;
+ return true;
+ }
+
TMaybe<TInstant> RecalcWatermark() {
+ const auto maxPartitionSeenTimeIter = MaxElementBy(
+ Data.begin(),
+ Data.end(),
+ [](const auto iter){ return iter.second.Time; });
+
+ if (maxPartitionSeenTimeIter == Data.end()) {
+ return Nothing();
+ }
+
+ const auto newWatermark = ToDiscreteTime(maxPartitionSeenTimeIter->second.Time - LateArrivalDelay);
+
if (!Watermark) {
// We have to inject start watermark before first data item, because some graph nodes can't start
// data processing without knowing what the current watermark is.
- Watermark = StartWatermark;
- return Watermark;
+ return Watermark = Max(StartWatermark, newWatermark);
}
- if (Data.size() < ExpectedPartitionsCount) {
- // Each partition should notify time at least once before we are able to move watermark
- return Nothing();
+ if (newWatermark > *Watermark) {
+ return Watermark = newWatermark;
}
- auto minTime = Data.begin()->second;
- for (const auto& [_, time] : Data) {
- if (time < minTime) {
- minTime = time;
- }
+ return Nothing();
+ }
+
+ bool UpdatePartitionTime(TPartitionState& state, TInstant partitionTime, TInstant sysTime) {
+ state.Time = partitionTime;
+ state.TimeNotifiedAt = sysTime;
+ LastTimeNotifiedAt = sysTime;
+
+ const auto watermark = ToDiscreteTime(partitionTime);
+ if (watermark >= state.Watermark) {
+ state.Watermark = watermark;
+ return true;
}
- if (minTime > Watermark) {
- Watermark = minTime;
- return Watermark;
+ return false;
+ }
+
+ TMaybe<TInstant> TryProduceFakeWatermark(TInstant systemTime) {
+ const auto fakeWatermark = ToDiscreteTime(systemTime - LateArrivalDelay);
+ if (fakeWatermark > Watermark) {
+ return Watermark = fakeWatermark;
}
return Nothing();
@@ -71,10 +133,13 @@ private:
private:
const TDuration Granularity;
const TInstant StartWatermark;
- const ui32 ExpectedPartitionsCount;
+ const bool IdlePartitionsEnabled;
+ const TDuration LateArrivalDelay;
- THashMap<TPartitionKey, TInstant> Data;
+ THashMap<TPartitionKey, TPartitionState> Data;
TMaybe<TInstant> Watermark;
+ TInstant LastTimeNotifiedAt; // last system time when tracker received notification for any partition
+ TMaybe<TInstant> NextIdlenessCheckAt;
};
}
diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
index c27347d22a6..f264713910a 100644
--- a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
+++ b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
@@ -4,79 +4,139 @@
namespace NYql::NDq {
+namespace {
+ TDqSourceWatermarkTracker<ui32> InitTracker(
+ TDuration granularitty = TDuration::Seconds(2),
+ TInstant startWatermark = TInstant::Seconds(10),
+ TDuration lateArrivalDelay = TDuration::Seconds(4),
+ bool IdlePartitionsEnabled = false,
+ TInstant systemTime = TInstant())
+ {
+ return TDqSourceWatermarkTracker<ui32>(
+ granularitty,
+ startWatermark,
+ IdlePartitionsEnabled,
+ lateArrivalDelay,
+ systemTime);
+ }
+
+ TDqSourceWatermarkTracker<ui32> InitTrackerPreset2() {
+ return InitTracker(
+ TDuration::Seconds(5),
+ TInstant::Seconds(10),
+ TDuration::Seconds(2),
+ true,
+ TInstant::Seconds(10));
+ }
+}
+
Y_UNIT_TEST_SUITE(TDqSourceWatermarkTrackerTest) {
- Y_UNIT_TEST(StartWatermark) {
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), TInstant::Seconds(11), 2);
+ Y_UNIT_TEST(StartWatermark1) {
+ auto tracker = InitTracker(TDuration::Seconds(2), TInstant::Seconds(11));
- const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11));
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11), TInstant());
UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10));
- const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11));
- UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); // Start watermark was returned already, we shouldn't return it 2-nd time
+ const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false);
}
- Y_UNIT_TEST(WatermarkMovement1) {
- const auto startWatermark = TInstant::Seconds(10);
+ Y_UNIT_TEST(StartWatermark2) {
+ auto tracker = InitTracker();
+
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10));
+
+ const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false);
+
+ const auto actual3 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual3, TInstant::Seconds(12));
+ }
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+ Y_UNIT_TEST(StartWatermark3) {
+ auto tracker = InitTrackerPreset2();
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
- const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(12));
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(19), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(15));
+ }
+
+ Y_UNIT_TEST(WatermarkMovement1) {
+ auto tracker = InitTracker();
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant());
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant());
UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
}
Y_UNIT_TEST(WatermarkMovement2) {
- const auto startWatermark = TInstant::Seconds(10);
-
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+ auto tracker = InitTracker();
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
- const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13));
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13), TInstant());
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant());
UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
}
Y_UNIT_TEST(WatermarkMovement3) {
- const auto startWatermark = TInstant::Seconds(10);
+ auto tracker = InitTracker();
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
-
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13));
- const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13));
- UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(16), TInstant());
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual.Defined(), false);
}
Y_UNIT_TEST(WatermarkMovement4) {
- const auto startWatermark = TInstant::Seconds(10);
+ auto tracker = InitTracker();
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10));
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
- const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14));
- UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(12));
+
+ const auto actual3 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(18), TInstant());
+ UNIT_ASSERT_VALUES_EQUAL(actual3, TInstant::Seconds(14));
}
- Y_UNIT_TEST(WatermarkFarMovement) {
- const auto startWatermark = TInstant::Seconds(10);
+ Y_UNIT_TEST(IdleFirstShouldReturnStartWatermark) {
+ auto tracker = InitTrackerPreset2();
+
+ const auto watermark = tracker.HandleIdleness(TInstant::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(watermark, TInstant::Seconds(10));
+ const auto watermark2 = tracker.HandleIdleness(TInstant::Seconds(10));
+ UNIT_ASSERT(!watermark2.Defined());
+ }
+
+ Y_UNIT_TEST(Idle1) {
+ auto tracker = InitTrackerPreset2();
+
+ const auto watermark = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(22), TInstant::Seconds(22));
+ UNIT_ASSERT_VALUES_EQUAL(watermark, TInstant::Seconds(20));
+
+ const auto watermark2 = tracker.HandleIdleness(TInstant::Seconds(22));
+ UNIT_ASSERT_VALUES_EQUAL(watermark2, TMaybe<TInstant>());
+
+ const auto watermark3 = tracker.HandleIdleness(TInstant::Seconds(23));
+ UNIT_ASSERT_VALUES_EQUAL(watermark3, TMaybe<TInstant>());
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+ const auto watermark4 = tracker.HandleIdleness(TInstant::Seconds(25));
+ UNIT_ASSERT_VALUES_EQUAL(watermark4, TMaybe<TInstant>());
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
- tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30));
- const auto actual = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30));
- UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(30));
+ const auto watermark5 = tracker.HandleIdleness(TInstant::Seconds(30));
+ UNIT_ASSERT_VALUES_EQUAL(watermark5, TInstant::Seconds(25));
}
- Y_UNIT_TEST(WaitExpectedPartitionsCount) {
- const auto startWatermark = TInstant::Seconds(10);
+ Y_UNIT_TEST(IdleNextCheckAt) {
+ auto tracker = InitTrackerPreset2();
- TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+ const auto nextCheckAt1 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(10));
+ UNIT_ASSERT_VALUES_EQUAL(nextCheckAt1, TInstant::Seconds(15));
- tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
- const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30));
- UNIT_ASSERT_VALUES_EQUAL(actual1.Defined(), false); // Since expectedPartitionsCount is 2, we shouldn't move watermark
+ const auto nextCheckAt2 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(11));
+ UNIT_ASSERT_VALUES_EQUAL(nextCheckAt2, TInstant::Seconds(15));
- const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30));
- UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(30));
+ const auto nextCheckAt3 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(15));
+ UNIT_ASSERT_VALUES_EQUAL(nextCheckAt3, TInstant::Seconds(20));
}
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index a4f6c8ea9e5..90e08b96158 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -216,8 +216,10 @@ struct TFakeCASetup {
const TReadValueParser<T> parser,
ui64 size,
i64 eachReadFreeSpace = 1000,
- TDuration timeout = TDuration::Seconds(30))
+ TDuration timeout = TDuration::Seconds(30),
+ bool onlyData = false)
{
+ ui32 dataItemsCt = 0;
std::vector<std::variant<T, TInstant>> result;
TInstant startedAt = TInstant::Now();
DoWithRetry([&](){
@@ -225,21 +227,24 @@ struct TFakeCASetup {
auto batch = AsyncInputRead<T>(parser, nextDataFuture, eachReadFreeSpace);
for (const auto& item : batch) {
result.emplace_back(item);
+ if (!onlyData || std::holds_alternative<T>(item)) {
+ dataItemsCt++;
+ }
}
if (TInstant::Now() > startedAt + timeout) {
return;
}
- if (result.size() < size) {
+ if (dataItemsCt < size) {
nextDataFuture.Wait(timeout);
- ythrow yexception() << "Not enough data";
+ ythrow yexception() << "Not enough items";
}
},
TRetryOptions(std::numeric_limits<ui32>::max()),
true);
- UNIT_ASSERT_EQUAL_C(result.size(), size, "Waited for " << size << " items but only " << result.size() << " received");
+ UNIT_ASSERT_C(dataItemsCt >= size, "Waited for " << size << " items but only " << dataItemsCt << " received");
return result;
}
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 76adaf38a12..2dc53082493 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -51,6 +51,8 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, EnableDqReplicate);
REGISTER_SETTING(*this, WatermarksMode);
REGISTER_SETTING(*this, WatermarksGranularityMs);
+ REGISTER_SETTING(*this, WatermarksLateArrivalDelayMs);
+ REGISTER_SETTING(*this, WatermarksEnableIdlePartitions);
REGISTER_SETTING(*this, UseAggPhases);
REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128);
}
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 635d2dd6492..2bd914ea2de 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -28,6 +28,8 @@ struct TDqSettings {
static constexpr ui64 OutputChunkMaxSize = 4_MB;
static constexpr ui64 ChunkSizeLimit = 128_MB;
static constexpr bool EnableDqReplicate = false;
+ static constexpr bool WatermarksGranularityMs = 1000;
+ static constexpr bool WatermarksLateArrivalDelayMs = 5000;
static constexpr ui64 ParallelOperationsLimit = 16;
};
@@ -74,7 +76,9 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> EnableDqReplicate;
NCommon::TConfSetting<bool, false> EnableGraceJoin;
NCommon::TConfSetting<TString, false> WatermarksMode;
+ NCommon::TConfSetting<bool, false> WatermarksEnableIdlePartitions;
NCommon::TConfSetting<ui64, false> WatermarksGranularityMs;
+ NCommon::TConfSetting<ui64, false> WatermarksLateArrivalDelayMs;
NCommon::TConfSetting<bool, false> UseAggPhases;
NCommon::TConfSetting<ui64, false> ParallelOperationsLimit;
@@ -117,8 +121,10 @@ struct TDqSettings {
SAVE_SETTING(WorkerFilter);
SAVE_SETTING(ComputeActorType);
SAVE_SETTING(WatermarksMode);
+ SAVE_SETTING(WatermarksEnableIdlePartitions);
SAVE_SETTING(EnableGraceJoin);
SAVE_SETTING(WatermarksGranularityMs);
+ SAVE_SETTING(WatermarksLateArrivalDelayMs);
SAVE_SETTING(UseAggPhases);
#undef SAVE_SETTING
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index 77890ee67e2..8b707c8007a 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -148,10 +148,10 @@ private:
if (!maybeHopTraits) {
return nullptr;
}
- const auto [hoppingColumn, hopTraits] = *maybeHopTraits;
+ const auto hopTraits = *maybeHopTraits;
const auto aggregateInputType = GetSeqItemType(node.Ptr()->Head().GetTypeAnn())->Cast<TStructExprType>();
- TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn);
+ TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);
if (keysDescription.NeedPickle()) {
return Build<TCoMap>(ctx, pos)
@@ -167,15 +167,15 @@ private:
}
const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
- const auto timeExtractorLambda = BuildTimeExtractor(hopTraits, ctx);
+ const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx);
- const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx);
- const auto watermarkMode = BuildWatermarkMode(aggregate, ctx);
- if (!watermarkMode) {
+ const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);
+ const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx);
+ if (!enableWatermarks) {
return nullptr;
}
@@ -183,17 +183,29 @@ private:
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
- .Hop(hopTraits.Hop())
- .Interval(hopTraits.Interval())
- .Delay(hopTraits.Delay())
- .DataWatermarks(hopTraits.DataWatermarks())
+ .Hop(hopTraits.Traits.Hop())
+ .Interval(hopTraits.Traits.Interval())
+ .DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
- .WatermarkMode(*watermarkMode);
+ .WatermarkMode<TCoAtom>().Build(ToString(*enableWatermarks));
+
+ if (*enableWatermarks) {
+ const auto hop = TDuration::MicroSeconds(hopTraits.Hop);
+ const auto lateArrivalDelay = TDuration::MilliSeconds(Config->WatermarksLateArrivalDelayMs
+ .Get()
+ .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs));
+
+ multiHoppingCoreBuilder.Delay<TCoInterval>()
+ .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds()))
+ .Build();
+ } else {
+ multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay());
+ }
if (Config->AnalyticsHopping.Get().GetOrElse(false)) {
return Build<TCoPartitionsByKeys>(ctx, node.Pos())
@@ -253,7 +265,15 @@ private:
}
}
- TMaybe<std::pair<TString, TCoHoppingTraits>> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) {
+ struct THoppingTraits {
+ TString Column;
+ TCoHoppingTraits Traits;
+ ui64 Hop;
+ ui64 Interval;
+ ui64 Delay;
+ };
+
+ TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();
const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
@@ -323,7 +343,13 @@ private:
: traits.DataWatermarks().Ptr())
.Done();
- return std::make_pair(hoppingColumn, newTraits);
+ return THoppingTraits {
+ hoppingColumn,
+ newTraits,
+ hop,
+ interval,
+ delay
+ };
}
struct TKeysDescription {
@@ -825,15 +851,28 @@ private:
.Ptr();
}
- TMaybe<TExprNode::TPtr> BuildWatermarkMode(const TCoAggregate& aggregate, TExprContext& ctx) {
+ TMaybe<bool> BuildWatermarkMode(
+ const TCoAggregate& aggregate,
+ const TCoHoppingTraits& hoppingTraits,
+ TExprContext& ctx)
+ {
const auto analyticsMode = Config->AnalyticsHopping.Get().GetOrElse(false);
- const bool enableWatermarks = !analyticsMode && Config->WatermarksMode.Get() == "default";
+ const bool enableWatermarks = !analyticsMode &&
+ Config->WatermarksMode.Get() == "default" &&
+ hoppingTraits.Version().Cast<TCoAtom>().StringValue() == "v2";
if (enableWatermarks && Config->ComputeActorType.Get() != "async") {
ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor"));
return Nothing();
}
- return ctx.NewAtom(aggregate.Pos(), ToString(enableWatermarks));
+ if (hoppingTraits.Version().Cast<TCoAtom>().StringValue() == "v2" && !enableWatermarks) {
+ ctx.AddError(TIssue(
+ ctx.GetPosition(aggregate.Pos()),
+ "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead."));
+ return Nothing();
+ }
+
+ return enableWatermarks;
}
private:
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 0b25a022da6..1fb54221a00 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -252,9 +252,29 @@ private:
TActor<TDqPqReadActor>::PassAway();
}
+ void MaybeScheduleNextIdleCheck(TInstant systemTime) {
+ if (!WatermarkTracker) {
+ return;
+ }
+
+ const auto nextIdleCheckAt = WatermarkTracker->GetNextIdlenessCheckAt(systemTime);
+ if (!nextIdleCheckAt) {
+ return;
+ }
+
+ if (!NextIdlenesCheckAt.Defined() || nextIdleCheckAt != *NextIdlenesCheckAt) {
+ NextIdlenesCheckAt = *nextIdleCheckAt;
+ SRC_LOG_T("Next idleness check scheduled at " << *nextIdleCheckAt);
+ Schedule(*nextIdleCheckAt, new TEvPrivate::TEvSourceDataReady());
+ }
+ }
+
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);
+ const auto now = TInstant::Now();
+ MaybeScheduleNextIdleCheck(now);
+
i64 usedSpace = 0;
if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
return usedSpace;
@@ -273,6 +293,16 @@ private:
std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
}
+ if (WatermarkTracker) {
+ const auto watermark = WatermarkTracker->HandleIdleness(now);
+
+ if (watermark) {
+ const auto t = watermark;
+ SRC_LOG_T("Fake watermark " << t << " was produced");
+ PushWatermarkToReady(*watermark);
+ }
+ }
+
if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
return usedSpace;
}
@@ -303,11 +333,12 @@ private:
return;
}
- WatermarkTracker = std::make_unique<TDqSourceWatermarkTracker<TPartitionKey>>(
+ WatermarkTracker.ConstructInPlace(
TDuration::MicroSeconds(SourceParams.GetWatermarks().GetGranularityUs()),
StartingMessageTimestamp,
- GetPartitionsToRead().size() // TODO: for the internal LB there is a problem here. See YQ-1384
- );
+ SourceParams.GetWatermarks().GetIdlePartitionsEnabled(),
+ TDuration::MicroSeconds(SourceParams.GetWatermarks().GetLateArrivalDelayUs()),
+ TInstant::Now());
}
NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const {
@@ -386,6 +417,17 @@ private:
return true;
}
+ void PushWatermarkToReady(TInstant watermark) {
+ SRC_LOG_D("New watermark " << watermark << " was generated");
+
+ if (Y_UNLIKELY(ReadyBuffer.empty() || ReadyBuffer.back().Watermark.Defined())) {
+ ReadyBuffer.emplace(watermark, 0);
+ return;
+ }
+
+ ReadyBuffer.back().Watermark = watermark;
+ }
+
struct TPQEventProcessor {
void operator()(NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) {
const auto partitionKey = MakePartitionKey(event.GetPartitionStream());
@@ -440,7 +482,7 @@ private:
void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent&) { }
TReadyBatch& GetActiveBatch(const TPartitionKey& partitionKey, TInstant time) {
- if (Y_UNLIKELY(Self.ReadyBuffer.empty())) {
+ if (Y_UNLIKELY(Self.ReadyBuffer.empty() || Self.ReadyBuffer.back().Watermark.Defined())) {
Self.ReadyBuffer.emplace(Nothing(), BatchCapacity);
}
@@ -451,15 +493,16 @@ private:
return activeBatch;
}
- const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime(partitionKey, time);
+ const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime(
+ partitionKey,
+ time,
+ TInstant::Now());
if (!maybeNewWatermark) {
// Watermark wasn't moved => use current active batch
return activeBatch;
}
- SRC_LOG_D("New watermark " << *maybeNewWatermark << " was generated");
- activeBatch.Watermark = maybeNewWatermark; // Write watermark to current batch
-
+ Self.PushWatermarkToReady(*maybeNewWatermark);
return Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); // And open new batch
}
@@ -514,7 +557,8 @@ private:
bool SubscribedOnEvent = false;
std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields;
std::queue<TReadyBatch> ReadyBuffer;
- std::unique_ptr<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
+ TMaybe<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
+ TMaybe<TInstant> NextIdlenesCheckAt;
};
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h
index 184a5d5eb0e..268944e9bbe 100644
--- a/ydb/library/yql/providers/pq/common/yql_names.h
+++ b/ydb/library/yql/providers/pq/common/yql_names.h
@@ -9,6 +9,9 @@ constexpr TStringBuf ConsumerSetting = "Consumer";
constexpr TStringBuf EndpointSetting = "Endpoint";
constexpr TStringBuf UseSslSetting = "UseSsl";
constexpr TStringBuf AddBearerToTokenSetting = "AddBearerToToken";
+constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable";
constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
+constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
+constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto
index 5ade3091b42..09342088851 100644
--- a/ydb/library/yql/providers/pq/proto/dq_io.proto
+++ b/ydb/library/yql/providers/pq/proto/dq_io.proto
@@ -17,6 +17,8 @@ enum EClusterType {
message TWatermarks {
bool Enabled = 1;
uint64 GranularityUs = 2;
+ uint64 LateArrivalDelayUs = 3;
+ bool IdlePartitionsEnabled = 4;
}
message TDqPqTopicSource {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index 5285e586d09..fbd03b35c7a 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -200,9 +200,14 @@ public:
srcDesc.SetUseSsl(FromString<bool>(Value(setting)));
} else if (name == AddBearerToTokenSetting) {
srcDesc.SetAddBearerToToken(FromString<bool>(Value(setting)));
- } else if (name == WatermarksGranularityUsSetting) {
+ } else if (name == WatermarksEnableSetting) {
srcDesc.MutableWatermarks()->SetEnabled(true);
+ } else if (name == WatermarksGranularityUsSetting) {
srcDesc.MutableWatermarks()->SetGranularityUs(FromString<ui64>(Value(setting)));
+ } else if (name == WatermarksLateArrivalDelayUsSetting) {
+ srcDesc.MutableWatermarks()->SetLateArrivalDelayUs(FromString<ui64>(Value(setting)));
+ } else if (name == WatermarksIdlePartitionsSetting) {
+ srcDesc.MutableWatermarks()->SetIdlePartitionsEnabled(true);
}
}
@@ -292,8 +297,23 @@ public:
}
if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") {
- const auto granularity = TDuration::MilliSeconds(dqSettings.WatermarksGranularityMs.Get().GetOrElse(1000));
+ Add(props, WatermarksEnableSetting, ToString(true), pos, ctx);
+
+ const auto granularity = TDuration::MilliSeconds(dqSettings
+ .WatermarksGranularityMs
+ .Get()
+ .GetOrElse(TDqSettings::TDefault::WatermarksGranularityMs));
Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx);
+
+ const auto lateArrivalDelay = TDuration::MilliSeconds(dqSettings
+ .WatermarksLateArrivalDelayMs
+ .Get()
+ .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs));
+ Add(props, WatermarksLateArrivalDelayUsSetting, ToString(lateArrivalDelay.MicroSeconds()), pos, ctx);
+ }
+
+ if (dqSettings.WatermarksEnableIdlePartitions.Get().GetOrElse(false)) {
+ Add(props, WatermarksIdlePartitionsSetting, ToString(true), pos, ctx);
}
return Build<TCoNameValueTupleList>(ctx, pos)
diff --git a/ydb/library/yql/sql/v0/node.cpp b/ydb/library/yql/sql/v0/node.cpp
index 57f0c6a561f..9fe816b17c9 100644
--- a/ydb/library/yql/sql/v0/node.cpp
+++ b/ydb/library/yql/sql/v0/node.cpp
@@ -1460,7 +1460,8 @@ TNodePtr ISource::BuildAggregation(const TString& label) {
HoppingWindowSpec->Hop,
HoppingWindowSpec->Interval,
HoppingWindowSpec->Delay,
- Q("False"));
+ Q("False"),
+ Q("v1"));
return Y("Aggregate", label, Q(keysTuple), Q(aggrArgs),
Q(Y(Q(Y(BuildQuotedAtom(Pos, "hopping"), hoppingTraits)))));
diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp
index a225305c808..3b8be7b6be0 100644
--- a/ydb/library/yql/sql/v1/builtin.cpp
+++ b/ydb/library/yql/sql/v1/builtin.cpp
@@ -2089,7 +2089,8 @@ TNodePtr THoppingWindow::BuildTraits(const TString& label) const {
Hop,
Interval,
Interval,
- Q("true"));
+ Q("true"),
+ Q("v2"));
}
bool THoppingWindow::DoInit(TContext& ctx, ISource* src) {
diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp
index c8ad9f85671..79256249a2e 100644
--- a/ydb/library/yql/sql/v1/node.cpp
+++ b/ydb/library/yql/sql/v1/node.cpp
@@ -1842,7 +1842,8 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte
LegacyHoppingWindowSpec->Hop,
LegacyHoppingWindowSpec->Interval,
LegacyHoppingWindowSpec->Delay,
- LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"));
+ LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"),
+ Q("v1"));
options = L(options, Q(Y(Q("hopping"), hoppingTraits)));
}