aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsvartmetal <svartmetal@yandex-team.com>2023-02-20 11:08:02 +0300
committersvartmetal <svartmetal@yandex-team.com>2023-02-20 11:08:02 +0300
commit2169b4de60762aca596fd4df574ce1ff26c921fb (patch)
tree8170a4a1b77bcc22103e4a2de692c900a8ee2c41
parentfa753918f8b35fc12c188a7c0dc2d8420d3c2781 (diff)
downloadydb-2169b4de60762aca596fd4df574ce1ff26c921fb.tar.gz
limit rate of DropBlockStoreVolume operations in SchemeShard
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp33
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h3
-rw-r--r--ydb/core/tx/schemeshard/ut_bsvolume.cpp98
-rw-r--r--ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp4
4 files changed, 135 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
index 9bdd302e09..60482c33d0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
@@ -6,6 +6,9 @@
namespace {
+constexpr char RateLimiterRateAttrName[] = "drop_blockstore_volume_rate_limiter_rate";
+constexpr char RateLimiterCapacityAttrName[] = "drop_blockstore_volume_rate_limiter_capacity";
+
using namespace NKikimr;
using namespace NSchemeShard;
@@ -182,6 +185,36 @@ public:
return result;
}
+ {
+ auto& rateLimiter = context.SS->DropBlockStoreVolumeRateLimiter;
+
+ // update rate limiter params
+ auto domainDir = context.SS->PathsById.at(path.GetPathIdForDomain());
+ double rate = 0;
+ double capacity = 0;
+ auto& attrs = domainDir->UserAttrs->Attrs;
+ if (TryFromString(attrs[RateLimiterRateAttrName], rate) &&
+ TryFromString(attrs[RateLimiterCapacityAttrName], capacity))
+ {
+ rateLimiter.SetRate(rate);
+ rateLimiter.SetCapacity(capacity);
+ }
+
+ if (rate > 0.0 && capacity > 0.0) {
+ rateLimiter.Fill(AppData()->TimeProvider->Now());
+
+ if (rateLimiter.Available() >= 1.0) {
+ rateLimiter.Take(1.0);
+ } else {
+ // TODO: should use separate status?
+ result->SetError(
+ NKikimrScheme::StatusNotAvailable,
+ "Too many requests");
+ return result;
+ }
+ }
+ }
+
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropBlockStoreVolume, path.Base()->PathId);
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
txState.MinStep = TStepId(1);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 757d7018df..9e753a55bd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -44,6 +44,7 @@
#include <ydb/core/tx/sequenceshard/public/events.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/util/pb.h>
+#include <ydb/core/util/token_bucket.h>
#include <ydb/core/ydb_convert/table_profiles.h>
#include <ydb/core/blockstore/core/blockstore.h>
@@ -328,6 +329,8 @@ public:
bool CleanDroppedPathsDisabled = true;
bool CleanDroppedSubDomainsInFly = false;
+ TTokenBucket DropBlockStoreVolumeRateLimiter;
+
TActorId DelayedInitTenantDestination;
TAutoPtr<TEvSchemeShard::TEvInitTenantSchemeShardResult> DelayedInitTenantReply;
diff --git a/ydb/core/tx/schemeshard/ut_bsvolume.cpp b/ydb/core/tx/schemeshard/ut_bsvolume.cpp
index c5824e6a64..4bcc51a200 100644
--- a/ydb/core/tx/schemeshard/ut_bsvolume.cpp
+++ b/ydb/core/tx/schemeshard/ut_bsvolume.cpp
@@ -102,6 +102,104 @@ Y_UNIT_TEST_SUITE(TBSV) {
// Good: Value { Struct { Optional { Struct { } Struct { Bool: false } } } } }
UNIT_ASSERT_VALUES_EQUAL(result.GetValue().GetStruct(0).GetOptional().GetStruct(0).ListSize(), 0);
}
+ }
+
+ Y_UNIT_TEST(ShouldLimitBlockStoreVolumeDropRate) {
+ struct TMockTimeProvider : public ITimeProvider
+ {
+ TInstant Time;
+
+ TInstant Now() override
+ {
+ return Time;
+ }
+ };
+
+ struct TTimeProviderMocker
+ {
+ TIntrusivePtr<ITimeProvider> OriginalTimeProvider;
+
+ TTimeProviderMocker(TIntrusivePtr<ITimeProvider> timeProvider)
+ {
+ OriginalTimeProvider = NKikimr::TAppData::TimeProvider;
+ NKikimr::TAppData::TimeProvider = timeProvider;
+ }
+
+ ~TTimeProviderMocker()
+ {
+ NKikimr::TAppData::TimeProvider = OriginalTimeProvider;
+ }
+ };
+
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+ auto root = "/MyRoot";
+ auto name = "BSVolume";
+ auto throttled = NKikimrScheme::StatusNotAvailable;
+
+ TestUserAttrs(runtime, ++txId, "", "MyRoot",
+ AlterUserAttrs(
+ {{"drop_blockstore_volume_rate_limiter_rate", "1.0"}}
+ )
+ );
+ env.TestWaitNotification(runtime, txId);
+
+ TestUserAttrs(runtime, ++txId, "", "MyRoot",
+ AlterUserAttrs(
+ {{"drop_blockstore_volume_rate_limiter_capacity", "10.0"}}
+ )
+ );
+ env.TestWaitNotification(runtime, txId);
+
+ TIntrusivePtr<TMockTimeProvider> mockTimeProvider =
+ new TMockTimeProvider();
+ TTimeProviderMocker mocker(mockTimeProvider);
+ NKikimrSchemeOp::TBlockStoreVolumeDescription descr;
+ descr.SetName(name);
+ auto& c = *descr.MutableVolumeConfig();
+ c.SetBlockSize(4096);
+ for (int i = 0; i < 4; ++i) {
+ c.AddExplicitChannelProfiles()->SetPoolKind("pool-kind-1");
+ }
+ c.AddPartitions()->SetBlockCount(16);
+
+ // consume all initial budget
+ for (int i = 0; i < 10; ++i) {
+ TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString());
+ env.TestWaitNotification(runtime, txId);
+ TestDropBlockStoreVolume(runtime, ++txId, root, name);
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString());
+ env.TestWaitNotification(runtime, txId);
+ // drop should be throttled
+ TestDropBlockStoreVolume(runtime, ++txId, root, name, {throttled});
+ env.TestWaitNotification(runtime, txId);
+
+ mockTimeProvider->Time = TInstant::Seconds(1);
+
+ // after 1 second, we should be able to drop one volume
+ TestDropBlockStoreVolume(runtime, ++txId, root, name);
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString());
+ env.TestWaitNotification(runtime, txId);
+ // next drop should be throttled
+ TestDropBlockStoreVolume(runtime, ++txId, root, name, {throttled});
+ env.TestWaitNotification(runtime, txId);
+
+ // turn off rate limiter
+ TestUserAttrs(runtime, ++txId, "", "MyRoot",
+ AlterUserAttrs(
+ {{"drop_blockstore_volume_rate_limiter_rate", "0.0"}}
+ )
+ );
+ env.TestWaitNotification(runtime, txId);
+
+ TestDropBlockStoreVolume(runtime, ++txId, root, name);
+ env.TestWaitNotification(runtime, txId);
}
}
diff --git a/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp b/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp
index 5dbdecf8e6..22ac155469 100644
--- a/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp
@@ -342,11 +342,9 @@ Y_UNIT_TEST_SUITE(TBSVWithReboots) {
t.TestEnv->TestWaitNotification(runtime, t.TxId-1);
TestLs(runtime, "/MyRoot/DirA/BSVolume_4", false, NLs::CheckMountToken("BSVolume_4", "Owner123"));
-
- TestAssignBlockStoreVolume(runtime, t.TxId++, "/MyRoot/DirA", "BSVolume_4", "Owner124", 1);
+ TestAssignBlockStoreVolume(runtime, t.TxId++, "/MyRoot/DirA", "BSVolume_4", "Owner124", 1);
t.TestEnv->TestWaitNotification(runtime, t.TxId-1);
TestLs(runtime, "/MyRoot/DirA/BSVolume_4", false, NLs::CheckMountToken("BSVolume_4", "Owner124"));
-
});
}
}