diff options
author | d-mokhnatkin <[email protected]> | 2022-11-01 12:52:05 +0300 |
---|---|---|
committer | d-mokhnatkin <[email protected]> | 2022-11-01 12:52:05 +0300 |
commit | 8049e68d98d4a7614ecb8772dd5c63b1c50df4f2 (patch) | |
tree | 957953ade699542412b7a92ebe495c8bcfb9061e | |
parent | 14b4b44f929d300374d993b6344b20a86cb5b4c5 (diff) |
idle partitions implementation
17 files changed, 363 insertions, 112 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 67aadaf04de..38cc8e9eef1 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -469,7 +469,7 @@ private: RateLimiterResourceCreatorId = Register(CreateRateLimiterResourceCreator(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName)); } return; - } + } if (QueryStateUpdateRequest.resources().compilation() == Fq::Private::TaskResources::PREPARE) { if (!ProgramRunnerId) { @@ -477,7 +477,7 @@ private: } return; } - + if (QueryStateUpdateRequest.resources().read_rules() == Fq::Private::TaskResources::PREPARE) { if (!ReadRulesCreatorId) { ReadRulesCreatorId = Register( @@ -1355,6 +1355,8 @@ private: apply("_EnablePrecompute", "1"); apply("WatermarksMode", "disable"); apply("WatermarksGranularityMs", "1000"); + apply("WatermarksLateArrivalDelayMs", "5000"); + apply("WatermarksIdlePartitions", "true"); switch (Params.QueryType) { case YandexQuery::QueryContent::STREAMING: { diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index a85cedb647a..c3cb4f6b04e 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -1609,6 +1609,7 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { .Interval(self.Interval()) .Delay(self.Delay()) .DataWatermarks(self.DataWatermarks()) + .Version(self.Version()) .Done().Ptr(); }; diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 8edbdbd78eb..6cd1d45e502 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -585,7 +585,8 @@ {"Index": 2, "Name": "Hop", "Type": "TExprBase"}, {"Index": 3, "Name": "Interval", "Type": "TExprBase"}, {"Index": 4, "Name": "Delay", "Type": "TExprBase"}, - {"Index": 5, "Name": "DataWatermarks", "Type": "TExprBase"} + {"Index": 5, "Name": "DataWatermarks", "Type": "TExprBase"}, + {"Index": 6, "Name": "Version", "Type": "TExprBase"} ] }, { diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 0824eda8f9a..c6708b83ca2 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -6044,7 +6044,7 @@ namespace { IGraphTransformer::TStatus HoppingTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); - if (!EnsureMinArgsCount(*input, 5, ctx.Expr) || !EnsureMaxArgsCount(*input, 6, ctx.Expr)) { + if (!EnsureArgsCount(*input, 7, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } if (auto status = EnsureTypeRewrite(input->HeadRef(), ctx.Expr); status != IGraphTransformer::TStatus::Ok) { @@ -6110,14 +6110,12 @@ namespace { return convertStatus; } - if (input->ChildrenSize() == 6) { - const auto& dataWatermarksNodePtr = input->ChildRef(5); - if (dataWatermarksNodePtr->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Unit) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(dataWatermarksNodePtr->Pos()), TStringBuilder() - << "Expected unit type, but got: " - << *dataWatermarksNodePtr->GetTypeAnn())); - return IGraphTransformer::TStatus::Error; - } + const auto& dataWatermarksNodePtr = input->ChildRef(5); + if (dataWatermarksNodePtr->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Unit) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(dataWatermarksNodePtr->Pos()), TStringBuilder() + << "Expected unit type, but got: " + << *dataWatermarksNodePtr->GetTypeAnn())); + return IGraphTransformer::TStatus::Error; } input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>()); diff --git a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h index 01f8f139393..5db02088ecc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h +++ b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h @@ -13,56 +13,118 @@ public: TDqSourceWatermarkTracker( TDuration granularity, TInstant startWatermark, - ui32 expectedPartitionsCount) + bool idlePartitionsEnabled, + TDuration lateArrivalDelay, + TInstant systemTime) : Granularity(granularity) , StartWatermark(ToDiscreteTime(startWatermark)) - , ExpectedPartitionsCount(expectedPartitionsCount) {} + , IdlePartitionsEnabled(idlePartitionsEnabled) + , LateArrivalDelay(lateArrivalDelay) + , LastTimeNotifiedAt(systemTime) {} + + TMaybe<TInstant> NotifyNewPartitionTime( + const TPartitionKey& partitionKey, + TInstant partitionTime, + TInstant systemTime) + { + auto [iter, _] = Data.try_emplace(partitionKey); + if (UpdatePartitionTime(iter->second, partitionTime, systemTime)) { + return RecalcWatermark(); + } - TMaybe<TInstant> NotifyNewPartitionTime(const TPartitionKey& partitionKey, TInstant time) { - auto granularPartitionTime = ToDiscreteTime(time); + return Nothing(); + } - auto iter = Data.find(partitionKey); - if (iter == Data.end()) { - Data[partitionKey] = granularPartitionTime; - return RecalcWatermark(); + TMaybe<TInstant> HandleIdleness(TInstant systemTime) { + if (!Watermark) { + return Watermark = StartWatermark; } - if (granularPartitionTime <= iter->second) { + if (!IdlePartitionsEnabled || !ShouldCheckIdlenessNow(systemTime)) { return Nothing(); } - iter->second = granularPartitionTime; + if (AllPartitionsAreIdle(systemTime)) { + return TryProduceFakeWatermark(systemTime); + } + return RecalcWatermark(); } + TMaybe<TInstant> GetNextIdlenessCheckAt(TInstant systemTime) { + return IdlePartitionsEnabled + ? ToDiscreteTime(systemTime + Granularity) + : TMaybe<TInstant>(); + } +private: + struct TPartitionState { + TInstant Time; // partition time, notified outside + TInstant TimeNotifiedAt; // system time when notification was received + TInstant Watermark; + }; + private: TInstant ToDiscreteTime(TInstant time) const { return TInstant::MicroSeconds(time.MicroSeconds() - time.MicroSeconds() % Granularity.MicroSeconds()); } + bool AllPartitionsAreIdle(TInstant systemTime) const { + return LastTimeNotifiedAt + LateArrivalDelay <= systemTime; + } + + bool ShouldCheckIdlenessNow(TInstant systemTime) { + const auto discreteSystemTime = ToDiscreteTime(systemTime); + if (discreteSystemTime < NextIdlenessCheckAt) { + return false; + } + + NextIdlenessCheckAt = discreteSystemTime + Granularity; + return true; + } + TMaybe<TInstant> RecalcWatermark() { + const auto maxPartitionSeenTimeIter = MaxElementBy( + Data.begin(), + Data.end(), + [](const auto iter){ return iter.second.Time; }); + + if (maxPartitionSeenTimeIter == Data.end()) { + return Nothing(); + } + + const auto newWatermark = ToDiscreteTime(maxPartitionSeenTimeIter->second.Time - LateArrivalDelay); + if (!Watermark) { // We have to inject start watermark before first data item, because some graph nodes can't start // data processing without knowing what the current watermark is. - Watermark = StartWatermark; - return Watermark; + return Watermark = Max(StartWatermark, newWatermark); } - if (Data.size() < ExpectedPartitionsCount) { - // Each partition should notify time at least once before we are able to move watermark - return Nothing(); + if (newWatermark > *Watermark) { + return Watermark = newWatermark; } - auto minTime = Data.begin()->second; - for (const auto& [_, time] : Data) { - if (time < minTime) { - minTime = time; - } + return Nothing(); + } + + bool UpdatePartitionTime(TPartitionState& state, TInstant partitionTime, TInstant sysTime) { + state.Time = partitionTime; + state.TimeNotifiedAt = sysTime; + LastTimeNotifiedAt = sysTime; + + const auto watermark = ToDiscreteTime(partitionTime); + if (watermark >= state.Watermark) { + state.Watermark = watermark; + return true; } - if (minTime > Watermark) { - Watermark = minTime; - return Watermark; + return false; + } + + TMaybe<TInstant> TryProduceFakeWatermark(TInstant systemTime) { + const auto fakeWatermark = ToDiscreteTime(systemTime - LateArrivalDelay); + if (fakeWatermark > Watermark) { + return Watermark = fakeWatermark; } return Nothing(); @@ -71,10 +133,13 @@ private: private: const TDuration Granularity; const TInstant StartWatermark; - const ui32 ExpectedPartitionsCount; + const bool IdlePartitionsEnabled; + const TDuration LateArrivalDelay; - THashMap<TPartitionKey, TInstant> Data; + THashMap<TPartitionKey, TPartitionState> Data; TMaybe<TInstant> Watermark; + TInstant LastTimeNotifiedAt; // last system time when tracker received notification for any partition + TMaybe<TInstant> NextIdlenessCheckAt; }; } diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp index c27347d22a6..f264713910a 100644 --- a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp @@ -4,79 +4,139 @@ namespace NYql::NDq { +namespace { + TDqSourceWatermarkTracker<ui32> InitTracker( + TDuration granularitty = TDuration::Seconds(2), + TInstant startWatermark = TInstant::Seconds(10), + TDuration lateArrivalDelay = TDuration::Seconds(4), + bool IdlePartitionsEnabled = false, + TInstant systemTime = TInstant()) + { + return TDqSourceWatermarkTracker<ui32>( + granularitty, + startWatermark, + IdlePartitionsEnabled, + lateArrivalDelay, + systemTime); + } + + TDqSourceWatermarkTracker<ui32> InitTrackerPreset2() { + return InitTracker( + TDuration::Seconds(5), + TInstant::Seconds(10), + TDuration::Seconds(2), + true, + TInstant::Seconds(10)); + } +} + Y_UNIT_TEST_SUITE(TDqSourceWatermarkTrackerTest) { - Y_UNIT_TEST(StartWatermark) { - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), TInstant::Seconds(11), 2); + Y_UNIT_TEST(StartWatermark1) { + auto tracker = InitTracker(TDuration::Seconds(2), TInstant::Seconds(11)); - const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11)); + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11), TInstant()); UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10)); - const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11)); - UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); // Start watermark was returned already, we shouldn't return it 2-nd time + const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); } - Y_UNIT_TEST(WatermarkMovement1) { - const auto startWatermark = TInstant::Seconds(10); + Y_UNIT_TEST(StartWatermark2) { + auto tracker = InitTracker(); + + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10)); + + const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); + + const auto actual3 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual3, TInstant::Seconds(12)); + } - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + Y_UNIT_TEST(StartWatermark3) { + auto tracker = InitTrackerPreset2(); - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); - const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(12)); + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(19), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(15)); + } + + Y_UNIT_TEST(WatermarkMovement1) { + auto tracker = InitTracker(); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant()); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant()); UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); } Y_UNIT_TEST(WatermarkMovement2) { - const auto startWatermark = TInstant::Seconds(10); - - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + auto tracker = InitTracker(); - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); - const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13)); + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13), TInstant()); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant()); UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); } Y_UNIT_TEST(WatermarkMovement3) { - const auto startWatermark = TInstant::Seconds(10); + auto tracker = InitTracker(); - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); - - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13)); - const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13)); - UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(16), TInstant()); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual.Defined(), false); } Y_UNIT_TEST(WatermarkMovement4) { - const auto startWatermark = TInstant::Seconds(10); + auto tracker = InitTracker(); - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10)); - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); - const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14)); - UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(16), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(12)); + + const auto actual3 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(18), TInstant()); + UNIT_ASSERT_VALUES_EQUAL(actual3, TInstant::Seconds(14)); } - Y_UNIT_TEST(WatermarkFarMovement) { - const auto startWatermark = TInstant::Seconds(10); + Y_UNIT_TEST(IdleFirstShouldReturnStartWatermark) { + auto tracker = InitTrackerPreset2(); + + const auto watermark = tracker.HandleIdleness(TInstant::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(watermark, TInstant::Seconds(10)); + const auto watermark2 = tracker.HandleIdleness(TInstant::Seconds(10)); + UNIT_ASSERT(!watermark2.Defined()); + } + + Y_UNIT_TEST(Idle1) { + auto tracker = InitTrackerPreset2(); + + const auto watermark = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(22), TInstant::Seconds(22)); + UNIT_ASSERT_VALUES_EQUAL(watermark, TInstant::Seconds(20)); + + const auto watermark2 = tracker.HandleIdleness(TInstant::Seconds(22)); + UNIT_ASSERT_VALUES_EQUAL(watermark2, TMaybe<TInstant>()); + + const auto watermark3 = tracker.HandleIdleness(TInstant::Seconds(23)); + UNIT_ASSERT_VALUES_EQUAL(watermark3, TMaybe<TInstant>()); - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + const auto watermark4 = tracker.HandleIdleness(TInstant::Seconds(25)); + UNIT_ASSERT_VALUES_EQUAL(watermark4, TMaybe<TInstant>()); - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); - tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30)); - const auto actual = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30)); - UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(30)); + const auto watermark5 = tracker.HandleIdleness(TInstant::Seconds(30)); + UNIT_ASSERT_VALUES_EQUAL(watermark5, TInstant::Seconds(25)); } - Y_UNIT_TEST(WaitExpectedPartitionsCount) { - const auto startWatermark = TInstant::Seconds(10); + Y_UNIT_TEST(IdleNextCheckAt) { + auto tracker = InitTrackerPreset2(); - TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + const auto nextCheckAt1 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(nextCheckAt1, TInstant::Seconds(15)); - tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); - const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30)); - UNIT_ASSERT_VALUES_EQUAL(actual1.Defined(), false); // Since expectedPartitionsCount is 2, we shouldn't move watermark + const auto nextCheckAt2 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(11)); + UNIT_ASSERT_VALUES_EQUAL(nextCheckAt2, TInstant::Seconds(15)); - const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30)); - UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(30)); + const auto nextCheckAt3 = tracker.GetNextIdlenessCheckAt(TInstant::Seconds(15)); + UNIT_ASSERT_VALUES_EQUAL(nextCheckAt3, TInstant::Seconds(20)); } } diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index a4f6c8ea9e5..90e08b96158 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -216,8 +216,10 @@ struct TFakeCASetup { const TReadValueParser<T> parser, ui64 size, i64 eachReadFreeSpace = 1000, - TDuration timeout = TDuration::Seconds(30)) + TDuration timeout = TDuration::Seconds(30), + bool onlyData = false) { + ui32 dataItemsCt = 0; std::vector<std::variant<T, TInstant>> result; TInstant startedAt = TInstant::Now(); DoWithRetry([&](){ @@ -225,21 +227,24 @@ struct TFakeCASetup { auto batch = AsyncInputRead<T>(parser, nextDataFuture, eachReadFreeSpace); for (const auto& item : batch) { result.emplace_back(item); + if (!onlyData || std::holds_alternative<T>(item)) { + dataItemsCt++; + } } if (TInstant::Now() > startedAt + timeout) { return; } - if (result.size() < size) { + if (dataItemsCt < size) { nextDataFuture.Wait(timeout); - ythrow yexception() << "Not enough data"; + ythrow yexception() << "Not enough items"; } }, TRetryOptions(std::numeric_limits<ui32>::max()), true); - UNIT_ASSERT_EQUAL_C(result.size(), size, "Waited for " << size << " items but only " << result.size() << " received"); + UNIT_ASSERT_C(dataItemsCt >= size, "Waited for " << size << " items but only " << dataItemsCt << " received"); return result; } diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 76adaf38a12..2dc53082493 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -51,6 +51,8 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, EnableDqReplicate); REGISTER_SETTING(*this, WatermarksMode); REGISTER_SETTING(*this, WatermarksGranularityMs); + REGISTER_SETTING(*this, WatermarksLateArrivalDelayMs); + REGISTER_SETTING(*this, WatermarksEnableIdlePartitions); REGISTER_SETTING(*this, UseAggPhases); REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128); } diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 635d2dd6492..2bd914ea2de 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -28,6 +28,8 @@ struct TDqSettings { static constexpr ui64 OutputChunkMaxSize = 4_MB; static constexpr ui64 ChunkSizeLimit = 128_MB; static constexpr bool EnableDqReplicate = false; + static constexpr bool WatermarksGranularityMs = 1000; + static constexpr bool WatermarksLateArrivalDelayMs = 5000; static constexpr ui64 ParallelOperationsLimit = 16; }; @@ -74,7 +76,9 @@ struct TDqSettings { NCommon::TConfSetting<bool, false> EnableDqReplicate; NCommon::TConfSetting<bool, false> EnableGraceJoin; NCommon::TConfSetting<TString, false> WatermarksMode; + NCommon::TConfSetting<bool, false> WatermarksEnableIdlePartitions; NCommon::TConfSetting<ui64, false> WatermarksGranularityMs; + NCommon::TConfSetting<ui64, false> WatermarksLateArrivalDelayMs; NCommon::TConfSetting<bool, false> UseAggPhases; NCommon::TConfSetting<ui64, false> ParallelOperationsLimit; @@ -117,8 +121,10 @@ struct TDqSettings { SAVE_SETTING(WorkerFilter); SAVE_SETTING(ComputeActorType); SAVE_SETTING(WatermarksMode); + SAVE_SETTING(WatermarksEnableIdlePartitions); SAVE_SETTING(EnableGraceJoin); SAVE_SETTING(WatermarksGranularityMs); + SAVE_SETTING(WatermarksLateArrivalDelayMs); SAVE_SETTING(UseAggPhases); #undef SAVE_SETTING diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index 77890ee67e2..8b707c8007a 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -148,10 +148,10 @@ private: if (!maybeHopTraits) { return nullptr; } - const auto [hoppingColumn, hopTraits] = *maybeHopTraits; + const auto hopTraits = *maybeHopTraits; const auto aggregateInputType = GetSeqItemType(node.Ptr()->Head().GetTypeAnn())->Cast<TStructExprType>(); - TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn); + TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); if (keysDescription.NeedPickle()) { return Build<TCoMap>(ctx, pos) @@ -167,15 +167,15 @@ private: } const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); - const auto timeExtractorLambda = BuildTimeExtractor(hopTraits, ctx); + const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx); const auto initLambda = BuildInitHopLambda(aggregate, ctx); const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx); - const auto watermarkMode = BuildWatermarkMode(aggregate, ctx); - if (!watermarkMode) { + const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx); + if (!enableWatermarks) { return nullptr; } @@ -183,17 +183,29 @@ private: auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos) .KeyExtractor(keyLambda) .TimeExtractor(timeExtractorLambda) - .Hop(hopTraits.Hop()) - .Interval(hopTraits.Interval()) - .Delay(hopTraits.Delay()) - .DataWatermarks(hopTraits.DataWatermarks()) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) .InitHandler(initLambda) .UpdateHandler(updateLambda) .MergeHandler(mergeLambda) .FinishHandler(finishLambda) .SaveHandler(saveLambda) .LoadHandler(loadLambda) - .WatermarkMode(*watermarkMode); + .WatermarkMode<TCoAtom>().Build(ToString(*enableWatermarks)); + + if (*enableWatermarks) { + const auto hop = TDuration::MicroSeconds(hopTraits.Hop); + const auto lateArrivalDelay = TDuration::MilliSeconds(Config->WatermarksLateArrivalDelayMs + .Get() + .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); + + multiHoppingCoreBuilder.Delay<TCoInterval>() + .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds())) + .Build(); + } else { + multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay()); + } if (Config->AnalyticsHopping.Get().GetOrElse(false)) { return Build<TCoPartitionsByKeys>(ctx, node.Pos()) @@ -253,7 +265,15 @@ private: } } - TMaybe<std::pair<TString, TCoHoppingTraits>> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { + struct THoppingTraits { + TString Column; + TCoHoppingTraits Traits; + ui64 Hop; + ui64 Interval; + ui64 Delay; + }; + + TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { const auto pos = aggregate.Pos(); const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); @@ -323,7 +343,13 @@ private: : traits.DataWatermarks().Ptr()) .Done(); - return std::make_pair(hoppingColumn, newTraits); + return THoppingTraits { + hoppingColumn, + newTraits, + hop, + interval, + delay + }; } struct TKeysDescription { @@ -825,15 +851,28 @@ private: .Ptr(); } - TMaybe<TExprNode::TPtr> BuildWatermarkMode(const TCoAggregate& aggregate, TExprContext& ctx) { + TMaybe<bool> BuildWatermarkMode( + const TCoAggregate& aggregate, + const TCoHoppingTraits& hoppingTraits, + TExprContext& ctx) + { const auto analyticsMode = Config->AnalyticsHopping.Get().GetOrElse(false); - const bool enableWatermarks = !analyticsMode && Config->WatermarksMode.Get() == "default"; + const bool enableWatermarks = !analyticsMode && + Config->WatermarksMode.Get() == "default" && + hoppingTraits.Version().Cast<TCoAtom>().StringValue() == "v2"; if (enableWatermarks && Config->ComputeActorType.Get() != "async") { ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); return Nothing(); } - return ctx.NewAtom(aggregate.Pos(), ToString(enableWatermarks)); + if (hoppingTraits.Version().Cast<TCoAtom>().StringValue() == "v2" && !enableWatermarks) { + ctx.AddError(TIssue( + ctx.GetPosition(aggregate.Pos()), + "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead.")); + return Nothing(); + } + + return enableWatermarks; } private: diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 0b25a022da6..1fb54221a00 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -252,9 +252,29 @@ private: TActor<TDqPqReadActor>::PassAway(); } + void MaybeScheduleNextIdleCheck(TInstant systemTime) { + if (!WatermarkTracker) { + return; + } + + const auto nextIdleCheckAt = WatermarkTracker->GetNextIdlenessCheckAt(systemTime); + if (!nextIdleCheckAt) { + return; + } + + if (!NextIdlenesCheckAt.Defined() || nextIdleCheckAt != *NextIdlenesCheckAt) { + NextIdlenesCheckAt = *nextIdleCheckAt; + SRC_LOG_T("Next idleness check scheduled at " << *nextIdleCheckAt); + Schedule(*nextIdleCheckAt, new TEvPrivate::TEvSourceDataReady()); + } + } + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override { SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); + const auto now = TInstant::Now(); + MaybeScheduleNextIdleCheck(now); + i64 usedSpace = 0; if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { return usedSpace; @@ -273,6 +293,16 @@ private: std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event); } + if (WatermarkTracker) { + const auto watermark = WatermarkTracker->HandleIdleness(now); + + if (watermark) { + const auto t = watermark; + SRC_LOG_T("Fake watermark " << t << " was produced"); + PushWatermarkToReady(*watermark); + } + } + if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { return usedSpace; } @@ -303,11 +333,12 @@ private: return; } - WatermarkTracker = std::make_unique<TDqSourceWatermarkTracker<TPartitionKey>>( + WatermarkTracker.ConstructInPlace( TDuration::MicroSeconds(SourceParams.GetWatermarks().GetGranularityUs()), StartingMessageTimestamp, - GetPartitionsToRead().size() // TODO: for the internal LB there is a problem here. See YQ-1384 - ); + SourceParams.GetWatermarks().GetIdlePartitionsEnabled(), + TDuration::MicroSeconds(SourceParams.GetWatermarks().GetLateArrivalDelayUs()), + TInstant::Now()); } NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const { @@ -386,6 +417,17 @@ private: return true; } + void PushWatermarkToReady(TInstant watermark) { + SRC_LOG_D("New watermark " << watermark << " was generated"); + + if (Y_UNLIKELY(ReadyBuffer.empty() || ReadyBuffer.back().Watermark.Defined())) { + ReadyBuffer.emplace(watermark, 0); + return; + } + + ReadyBuffer.back().Watermark = watermark; + } + struct TPQEventProcessor { void operator()(NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) { const auto partitionKey = MakePartitionKey(event.GetPartitionStream()); @@ -440,7 +482,7 @@ private: void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent&) { } TReadyBatch& GetActiveBatch(const TPartitionKey& partitionKey, TInstant time) { - if (Y_UNLIKELY(Self.ReadyBuffer.empty())) { + if (Y_UNLIKELY(Self.ReadyBuffer.empty() || Self.ReadyBuffer.back().Watermark.Defined())) { Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); } @@ -451,15 +493,16 @@ private: return activeBatch; } - const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime(partitionKey, time); + const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime( + partitionKey, + time, + TInstant::Now()); if (!maybeNewWatermark) { // Watermark wasn't moved => use current active batch return activeBatch; } - SRC_LOG_D("New watermark " << *maybeNewWatermark << " was generated"); - activeBatch.Watermark = maybeNewWatermark; // Write watermark to current batch - + Self.PushWatermarkToReady(*maybeNewWatermark); return Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); // And open new batch } @@ -514,7 +557,8 @@ private: bool SubscribedOnEvent = false; std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields; std::queue<TReadyBatch> ReadyBuffer; - std::unique_ptr<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker; + TMaybe<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker; + TMaybe<TInstant> NextIdlenesCheckAt; }; std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h index 184a5d5eb0e..268944e9bbe 100644 --- a/ydb/library/yql/providers/pq/common/yql_names.h +++ b/ydb/library/yql/providers/pq/common/yql_names.h @@ -9,6 +9,9 @@ constexpr TStringBuf ConsumerSetting = "Consumer"; constexpr TStringBuf EndpointSetting = "Endpoint"; constexpr TStringBuf UseSslSetting = "UseSsl"; constexpr TStringBuf AddBearerToTokenSetting = "AddBearerToToken"; +constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable"; constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs"; +constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs"; +constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions"; } // namespace NYql diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index 5ade3091b42..09342088851 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -17,6 +17,8 @@ enum EClusterType { message TWatermarks { bool Enabled = 1; uint64 GranularityUs = 2; + uint64 LateArrivalDelayUs = 3; + bool IdlePartitionsEnabled = 4; } message TDqPqTopicSource { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 5285e586d09..fbd03b35c7a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -200,9 +200,14 @@ public: srcDesc.SetUseSsl(FromString<bool>(Value(setting))); } else if (name == AddBearerToTokenSetting) { srcDesc.SetAddBearerToToken(FromString<bool>(Value(setting))); - } else if (name == WatermarksGranularityUsSetting) { + } else if (name == WatermarksEnableSetting) { srcDesc.MutableWatermarks()->SetEnabled(true); + } else if (name == WatermarksGranularityUsSetting) { srcDesc.MutableWatermarks()->SetGranularityUs(FromString<ui64>(Value(setting))); + } else if (name == WatermarksLateArrivalDelayUsSetting) { + srcDesc.MutableWatermarks()->SetLateArrivalDelayUs(FromString<ui64>(Value(setting))); + } else if (name == WatermarksIdlePartitionsSetting) { + srcDesc.MutableWatermarks()->SetIdlePartitionsEnabled(true); } } @@ -292,8 +297,23 @@ public: } if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") { - const auto granularity = TDuration::MilliSeconds(dqSettings.WatermarksGranularityMs.Get().GetOrElse(1000)); + Add(props, WatermarksEnableSetting, ToString(true), pos, ctx); + + const auto granularity = TDuration::MilliSeconds(dqSettings + .WatermarksGranularityMs + .Get() + .GetOrElse(TDqSettings::TDefault::WatermarksGranularityMs)); Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx); + + const auto lateArrivalDelay = TDuration::MilliSeconds(dqSettings + .WatermarksLateArrivalDelayMs + .Get() + .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); + Add(props, WatermarksLateArrivalDelayUsSetting, ToString(lateArrivalDelay.MicroSeconds()), pos, ctx); + } + + if (dqSettings.WatermarksEnableIdlePartitions.Get().GetOrElse(false)) { + Add(props, WatermarksIdlePartitionsSetting, ToString(true), pos, ctx); } return Build<TCoNameValueTupleList>(ctx, pos) diff --git a/ydb/library/yql/sql/v0/node.cpp b/ydb/library/yql/sql/v0/node.cpp index 57f0c6a561f..9fe816b17c9 100644 --- a/ydb/library/yql/sql/v0/node.cpp +++ b/ydb/library/yql/sql/v0/node.cpp @@ -1460,7 +1460,8 @@ TNodePtr ISource::BuildAggregation(const TString& label) { HoppingWindowSpec->Hop, HoppingWindowSpec->Interval, HoppingWindowSpec->Delay, - Q("False")); + Q("False"), + Q("v1")); return Y("Aggregate", label, Q(keysTuple), Q(aggrArgs), Q(Y(Q(Y(BuildQuotedAtom(Pos, "hopping"), hoppingTraits))))); diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp index a225305c808..3b8be7b6be0 100644 --- a/ydb/library/yql/sql/v1/builtin.cpp +++ b/ydb/library/yql/sql/v1/builtin.cpp @@ -2089,7 +2089,8 @@ TNodePtr THoppingWindow::BuildTraits(const TString& label) const { Hop, Interval, Interval, - Q("true")); + Q("true"), + Q("v2")); } bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index c8ad9f85671..79256249a2e 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -1842,7 +1842,8 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte LegacyHoppingWindowSpec->Hop, LegacyHoppingWindowSpec->Interval, LegacyHoppingWindowSpec->Delay, - LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false")); + LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"), + Q("v1")); options = L(options, Q(Y(Q("hopping"), hoppingTraits))); } |