aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-01-16 19:04:19 +0300
committereivanov89 <eivanov89@ydb.tech>2023-01-16 19:04:19 +0300
commitf24214276985020c32243e123b73cae38b6ae093 (patch)
tree1f1b3a03ce1f4a9ce9c79bcb2adf9b6461a0929f
parenta5454b0ec53734331656e033e0403926c1ba9c65 (diff)
downloadydb-f24214276985020c32243e123b73cae38b6ae093.tar.gz
PR from branch users/eivanov89/-long-timers
use activity mark up in compaction queue actors create long timer in user pool fix setting multiple short timers when blocked by RPS limit, don't wakeup compaction queue too often
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h6
-rw-r--r--ydb/core/util/operation_queue.h61
-rw-r--r--ydb/core/util/operation_queue_ut.cpp40
7 files changed, 102 insertions, 30 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index fb51130cc50..a8d488a77d1 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1593,6 +1593,9 @@ message TCompactionConfig {
// Compact even if shard has single part and empty memtable
optional bool CompactSinglePartedShards = 10 [default = false];
+
+ // Do not wakeup earlier, than this interval
+ optional uint64 MinWakeupIntervalMs = 11 [default = 10];
}
message TBorrowedCompactionConfig {
@@ -1601,6 +1604,9 @@ message TCompactionConfig {
// After this interval we will try to restart
optional uint64 TimeoutSeconds = 3 [default = 15];
+
+ // Do not wakeup earlier, than this interval
+ optional uint64 MinWakeupIntervalMs = 4 [default = 10];
}
optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index f35726ad935..3d241f4acb0 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -958,5 +958,7 @@ message TActivity {
SCHEMESHARD_SVP_MIGRATOR = 597;
SS_FETCHING_ACTOR = 598;
METADATA_SCHEME_DESCRIPTION_ACTOR = 599;
+ SCHEMESHARD_BACKGROUND_COMPACTION = 600;
+ SCHEMESHARD_BORROWED_COMPACTION = 601;
};
};
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 37da4095d78..794c9368669 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -19,14 +19,14 @@ namespace NKikimr {
// TODO: make part of util?
namespace NOperationQueue {
-template <typename T, typename TQueue, int Ev, int LogServiceId>
+template <typename T, typename TQueue, int Ev, int LogServiceId, ui32 ActivityType>
class TOperationQueueWithTimer
- : public TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>>
+ : public TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId, ActivityType>>
, public ITimer
, public TOperationQueue<T, TQueue>
{
- using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>;
- using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>>;
+ using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId, ActivityType>;
+ using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId, ActivityType>>;
using TBase = TOperationQueue<T, TQueue>;
struct TEvWakeupQueue : public TEventLocal<TEvWakeupQueue, Ev> {
@@ -53,6 +53,10 @@ public:
, TBase(config, queueConfig, starter, *this)
{}
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::EType(ActivityType);
+ }
+
void Shutdown(const TActorContext &ctx) {
if (LongTimerId)
ctx.Send(LongTimerId, new TEvents::TEvPoison);
@@ -72,7 +76,8 @@ private:
When = this->Now() + delta;
auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId());
LongTimerId = CreateLongTimer(ctx, delta,
- new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue));
+ new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue),
+ AppData(ctx)->UserPoolId);
LOG_DEBUG_S(ctx, ServiceId,
"Operation queue set wakeup after delta# " << delta.Seconds() << " seconds");
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 503bc848e79..b1898ccfef5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -6369,6 +6369,7 @@ void TSchemeShard::ConfigureBackgroundCompactionQueue(
compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds());
compactionConfig.WakeupInterval = TDuration::Seconds(config.GetWakeupIntervalSeconds());
+ compactionConfig.MinWakeupInterval = TDuration::MilliSeconds(config.GetMinWakeupIntervalMs());
compactionConfig.InflightLimit = config.GetInflightLimit();
compactionConfig.RoundInterval = TDuration::Seconds(config.GetRoundSeconds());
compactionConfig.MaxRate = config.GetMaxRate();
@@ -6403,6 +6404,7 @@ void TSchemeShard::ConfigureBorrowedCompactionQueue(
compactionConfig.IsCircular = false;
compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds());
+ compactionConfig.MinWakeupInterval = TDuration::MilliSeconds(config.GetMinWakeupIntervalMs());
compactionConfig.InflightLimit = config.GetInflightLimit();
compactionConfig.MaxRate = config.GetMaxRate();
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 59b87142375..70395b2e16b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -82,7 +82,8 @@ private:
TShardCompactionInfo,
TCompactionQueueImpl,
TEvPrivate::EvRunBackgroundCompaction,
- NKikimrServices::FLAT_TX_SCHEMESHARD>;
+ NKikimrServices::FLAT_TX_SCHEMESHARD,
+ NKikimrServices::TActivity::SCHEMESHARD_BACKGROUND_COMPACTION>;
class TCompactionStarter : public TCompactionQueue::IStarter {
public:
@@ -106,7 +107,8 @@ private:
TShardIdx,
TFifoQueue<TShardIdx>,
TEvPrivate::EvRunBorrowedCompaction,
- NKikimrServices::FLAT_TX_SCHEMESHARD>;
+ NKikimrServices::FLAT_TX_SCHEMESHARD,
+ NKikimrServices::TActivity::SCHEMESHARD_BORROWED_COMPACTION>;
class TBorrowedCompactionStarter : public TBorrowedCompactionQueue::IStarter {
public:
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index 61bed56bc30..43de2cc0f0e 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -53,6 +53,9 @@ struct TConfig {
// retry after this amount of time
TDuration WakeupInterval = TDuration::Seconds(1);
+ // Do not wakeup too often
+ TDuration MinWakeupInterval = TDuration::Zero();
+
// done and timeouted items are enqueued again
bool IsCircular = false;
@@ -631,43 +634,55 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
return;
auto now = Timer.Now();
+ auto wakeup = TMonotonic::Max();
+
if (RunningItems.Empty() && !ReadyQueue.Empty()) {
if (TokenBucket.Available() <= 0) {
// we didn't start anything because of RPS limit
- NextWakeup = now + TokenBucket.NextAvailableDelay();
- Timer.SetWakeupTimer(TokenBucket.NextAvailableDelay());
- return;
+ wakeup = now + TokenBucket.NextAvailableDelay();
} else if (!NextWakeup) {
// special case when we failed to start anything
- NextWakeup = now + Config.WakeupInterval;
- Timer.SetWakeupTimer(Config.WakeupInterval);
- return;
+ wakeup = now + Config.WakeupInterval;
+ }
+ } else {
+ // note, that by design we should have remove timeouted items,
+ // thus assume that timeout is in future
+ if (Config.Timeout && !RunningItems.Empty()) {
+ const auto& item = RunningItems.Front();
+ wakeup = Min(wakeup, item.Timestamp + Config.Timeout);
}
- }
- auto wakeup = TMonotonic::Max();
+ if (!WaitingItems.Empty()) {
+ const auto& item = WaitingItems.Front();
+ wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay);
+ }
- if (Config.Timeout && !RunningItems.Empty()) {
- const auto& item = RunningItems.Front();
- wakeup = Min(wakeup, item.Timestamp + Config.Timeout);
- }
+ // neither timeout will happen or there any waiting items.
+ // in this case, queue will be triggered by enqueue operation.
+ if (wakeup == TMonotonic::Max())
+ return;
- if (!WaitingItems.Empty()) {
- const auto& item = WaitingItems.Front();
- wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay);
+ // no sense to wakeup earlier that rate limit allows
+ if (HasRateLimit) {
+ wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay());
+ }
}
- if (wakeup == TMonotonic::Max())
- return;
+ // don't wakeup too often (as well as don't wakeup in past)
+ wakeup = Max(wakeup, now + Config.MinWakeupInterval);
- // no sense to wakeup earlier that rate limit allows
- if (HasRateLimit) {
- wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay());
- }
-
- if (!NextWakeup || NextWakeup > wakeup) {
+ if (!NextWakeup) {
NextWakeup = wakeup;
Timer.SetWakeupTimer(wakeup - now);
+ return;
+ }
+
+ if (NextWakeup > wakeup) {
+ auto delta = NextWakeup - wakeup;
+ if (!Config.MinWakeupInterval || delta > Config.MinWakeupInterval) {
+ NextWakeup = wakeup;
+ Timer.SetWakeupTimer(wakeup - now);
+ }
}
}
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index 7daa22ba675..41615948678 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -955,6 +955,46 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.back(), goldWakeup);
}
+ Y_UNIT_TEST(CheckWakeupWhenRPSExhausted2) {
+ // regression case for the following case:
+ // 1. Enqueue operation 1.
+ // 2. Done operation 1.
+ // 3. Enqueue 2 and 3 - they should not add extra wakeups
+
+ TQueue::TConfig config;
+ config.IsCircular = true;
+ config.InflightLimit = 1;
+ config.MaxRate = 0.5;
+ config.Timeout = Timeout;
+ TOperationStarter starter;
+
+ TQueue queue(config, starter, starter);
+ queue.Start();
+
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL); // only timeout for 1
+
+ queue.OnDone(1);
+
+ // 2 is running now because token bucket allows
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL); // only first timeout
+
+ queue.Enqueue(3);
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL); // only first timeout
+
+ queue.OnDone(2);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); // blocked by RPS
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL); // start new one, when RPS allows
+
+ queue.Enqueue(4);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); // no change
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL); // no change
+ }
+
Y_UNIT_TEST(CheckStartAfterStop) {
TQueue::TConfig config;
config.IsCircular = true;