diff options
author | svartmetal <svartmetal@yandex-team.com> | 2023-02-20 11:08:02 +0300 |
---|---|---|
committer | svartmetal <svartmetal@yandex-team.com> | 2023-02-20 11:08:02 +0300 |
commit | 2169b4de60762aca596fd4df574ce1ff26c921fb (patch) | |
tree | 8170a4a1b77bcc22103e4a2de692c900a8ee2c41 | |
parent | fa753918f8b35fc12c188a7c0dc2d8420d3c2781 (diff) | |
download | ydb-2169b4de60762aca596fd4df574ce1ff26c921fb.tar.gz |
limit rate of DropBlockStoreVolume operations in SchemeShard
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_bsvolume.cpp | 98 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp | 4 |
4 files changed, 135 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp index 9bdd302e09..60482c33d0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp @@ -6,6 +6,9 @@ namespace { +constexpr char RateLimiterRateAttrName[] = "drop_blockstore_volume_rate_limiter_rate"; +constexpr char RateLimiterCapacityAttrName[] = "drop_blockstore_volume_rate_limiter_capacity"; + using namespace NKikimr; using namespace NSchemeShard; @@ -182,6 +185,36 @@ public: return result; } + { + auto& rateLimiter = context.SS->DropBlockStoreVolumeRateLimiter; + + // update rate limiter params + auto domainDir = context.SS->PathsById.at(path.GetPathIdForDomain()); + double rate = 0; + double capacity = 0; + auto& attrs = domainDir->UserAttrs->Attrs; + if (TryFromString(attrs[RateLimiterRateAttrName], rate) && + TryFromString(attrs[RateLimiterCapacityAttrName], capacity)) + { + rateLimiter.SetRate(rate); + rateLimiter.SetCapacity(capacity); + } + + if (rate > 0.0 && capacity > 0.0) { + rateLimiter.Fill(AppData()->TimeProvider->Now()); + + if (rateLimiter.Available() >= 1.0) { + rateLimiter.Take(1.0); + } else { + // TODO: should use separate status? + result->SetError( + NKikimrScheme::StatusNotAvailable, + "Too many requests"); + return result; + } + } + } + TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropBlockStoreVolume, path.Base()->PathId); // Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped" txState.MinStep = TStepId(1); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 757d7018df..9e753a55bd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -44,6 +44,7 @@ #include <ydb/core/tx/sequenceshard/public/events.h> #include <ydb/core/tx/tx_processing.h> #include <ydb/core/util/pb.h> +#include <ydb/core/util/token_bucket.h> #include <ydb/core/ydb_convert/table_profiles.h> #include <ydb/core/blockstore/core/blockstore.h> @@ -328,6 +329,8 @@ public: bool CleanDroppedPathsDisabled = true; bool CleanDroppedSubDomainsInFly = false; + TTokenBucket DropBlockStoreVolumeRateLimiter; + TActorId DelayedInitTenantDestination; TAutoPtr<TEvSchemeShard::TEvInitTenantSchemeShardResult> DelayedInitTenantReply; diff --git a/ydb/core/tx/schemeshard/ut_bsvolume.cpp b/ydb/core/tx/schemeshard/ut_bsvolume.cpp index c5824e6a64..4bcc51a200 100644 --- a/ydb/core/tx/schemeshard/ut_bsvolume.cpp +++ b/ydb/core/tx/schemeshard/ut_bsvolume.cpp @@ -102,6 +102,104 @@ Y_UNIT_TEST_SUITE(TBSV) { // Good: Value { Struct { Optional { Struct { } Struct { Bool: false } } } } } UNIT_ASSERT_VALUES_EQUAL(result.GetValue().GetStruct(0).GetOptional().GetStruct(0).ListSize(), 0); } + } + + Y_UNIT_TEST(ShouldLimitBlockStoreVolumeDropRate) { + struct TMockTimeProvider : public ITimeProvider + { + TInstant Time; + + TInstant Now() override + { + return Time; + } + }; + + struct TTimeProviderMocker + { + TIntrusivePtr<ITimeProvider> OriginalTimeProvider; + + TTimeProviderMocker(TIntrusivePtr<ITimeProvider> timeProvider) + { + OriginalTimeProvider = NKikimr::TAppData::TimeProvider; + NKikimr::TAppData::TimeProvider = timeProvider; + } + + ~TTimeProviderMocker() + { + NKikimr::TAppData::TimeProvider = OriginalTimeProvider; + } + }; + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + auto root = "/MyRoot"; + auto name = "BSVolume"; + auto throttled = NKikimrScheme::StatusNotAvailable; + + TestUserAttrs(runtime, ++txId, "", "MyRoot", + AlterUserAttrs( + {{"drop_blockstore_volume_rate_limiter_rate", "1.0"}} + ) + ); + env.TestWaitNotification(runtime, txId); + + TestUserAttrs(runtime, ++txId, "", "MyRoot", + AlterUserAttrs( + {{"drop_blockstore_volume_rate_limiter_capacity", "10.0"}} + ) + ); + env.TestWaitNotification(runtime, txId); + + TIntrusivePtr<TMockTimeProvider> mockTimeProvider = + new TMockTimeProvider(); + TTimeProviderMocker mocker(mockTimeProvider); + NKikimrSchemeOp::TBlockStoreVolumeDescription descr; + descr.SetName(name); + auto& c = *descr.MutableVolumeConfig(); + c.SetBlockSize(4096); + for (int i = 0; i < 4; ++i) { + c.AddExplicitChannelProfiles()->SetPoolKind("pool-kind-1"); + } + c.AddPartitions()->SetBlockCount(16); + + // consume all initial budget + for (int i = 0; i < 10; ++i) { + TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString()); + env.TestWaitNotification(runtime, txId); + TestDropBlockStoreVolume(runtime, ++txId, root, name); + env.TestWaitNotification(runtime, txId); + } + + TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString()); + env.TestWaitNotification(runtime, txId); + // drop should be throttled + TestDropBlockStoreVolume(runtime, ++txId, root, name, {throttled}); + env.TestWaitNotification(runtime, txId); + + mockTimeProvider->Time = TInstant::Seconds(1); + + // after 1 second, we should be able to drop one volume + TestDropBlockStoreVolume(runtime, ++txId, root, name); + env.TestWaitNotification(runtime, txId); + + TestCreateBlockStoreVolume(runtime, ++txId, root, descr.DebugString()); + env.TestWaitNotification(runtime, txId); + // next drop should be throttled + TestDropBlockStoreVolume(runtime, ++txId, root, name, {throttled}); + env.TestWaitNotification(runtime, txId); + + // turn off rate limiter + TestUserAttrs(runtime, ++txId, "", "MyRoot", + AlterUserAttrs( + {{"drop_blockstore_volume_rate_limiter_rate", "0.0"}} + ) + ); + env.TestWaitNotification(runtime, txId); + + TestDropBlockStoreVolume(runtime, ++txId, root, name); + env.TestWaitNotification(runtime, txId); } } diff --git a/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp b/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp index 5dbdecf8e6..22ac155469 100644 --- a/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_bsvolume_reboots.cpp @@ -342,11 +342,9 @@ Y_UNIT_TEST_SUITE(TBSVWithReboots) { t.TestEnv->TestWaitNotification(runtime, t.TxId-1); TestLs(runtime, "/MyRoot/DirA/BSVolume_4", false, NLs::CheckMountToken("BSVolume_4", "Owner123")); - - TestAssignBlockStoreVolume(runtime, t.TxId++, "/MyRoot/DirA", "BSVolume_4", "Owner124", 1); + TestAssignBlockStoreVolume(runtime, t.TxId++, "/MyRoot/DirA", "BSVolume_4", "Owner124", 1); t.TestEnv->TestWaitNotification(runtime, t.TxId-1); TestLs(runtime, "/MyRoot/DirA/BSVolume_4", false, NLs::CheckMountToken("BSVolume_4", "Owner124")); - }); } } |