diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-03-22 00:10:48 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexvru@mail.ru> | 2022-03-22 00:10:48 +0300 |
commit | 06da0639afb2fcf3a54ba3a29061c703e2c387e4 (patch) | |
tree | c2d003c41232c9e57e2e6e6e68d454bde56aa022 | |
parent | 74dae97bf191600137c2c4907f5b74fc42799e58 (diff) | |
download | ydb-06da0639afb2fcf3a54ba3a29061c703e2c387e4.tar.gz |
Alternative way of queue pruning in DS proxy KIKIMR-8102
ref:92876c6be03ecd3a97971ac1643444f56e91d002
-rw-r--r-- | ydb/core/blobstorage/backpressure/event.h | 6 | ||||
-rw-r--r-- | ydb/core/blobstorage/backpressure/queue.cpp | 39 | ||||
-rw-r--r-- | ydb/core/blobstorage/backpressure/queue.h | 8 | ||||
-rw-r--r-- | ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy.h | 16 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/counting_events.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/vdisk_events.h | 61 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp | 3 |
8 files changed, 64 insertions, 85 deletions
diff --git a/ydb/core/blobstorage/backpressure/event.h b/ydb/core/blobstorage/backpressure/event.h index 7b8ea72d4c2..e5f5ff1531b 100644 --- a/ydb/core/blobstorage/backpressure/event.h +++ b/ydb/core/blobstorage/backpressure/event.h @@ -40,6 +40,7 @@ class TEventHolder { TIntrusivePtr<TEventSerializedData> Buffer; TBSProxyContextPtr BSProxyCtx; std::unique_ptr<IEventBase> LocalEvent; + std::optional<std::weak_ptr<TMessageRelevanceTracker>> Tracker; public: TEventHolder() @@ -59,6 +60,7 @@ public: , InterconnectChannel(interconnectChannel) , Orbit(MoveOrbit(ev)) , BSProxyCtx(bspctx) + , Tracker(std::move(ev->Get()->MessageRelevanceTracker)) { // trace the event if constexpr (std::is_same_v<TPtr, TEvBlobStorage::TEvVPut::TPtr>) { @@ -90,6 +92,10 @@ public: Discard(); } + bool Relevant() const { + return !Tracker || !Tracker->expired(); + } + ui32 GetByteSize() const { return ByteSize; } diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp index c573b7b16b4..7e9622334d5 100644 --- a/ydb/core/blobstorage/backpressure/queue.cpp +++ b/ydb/core/blobstorage/backpressure/queue.cpp @@ -131,6 +131,12 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re continue; } + if (!item.Event.Relevant()) { + ++*QueueItemsPruned; + it = EraseItem(Queues.Waiting, it); + continue; + } + // update item parameters item.MsgId = NextMsgId; item.SequenceId = CurrentSequenceId; @@ -231,13 +237,13 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc Y_VERIFY(InFlightCost >= it->Cost); InFlightCost -= it->Cost; - const bool discard = it->Discarded; + const bool relevant = it->Event.Relevant(); *outSender = it->Event.GetSender(); *outCookie = it->Event.GetCookie(); *processingTime = TDuration::Seconds(it->ProcessingTimer.Passed()); LWTRACK(DSQueueVPutResultRecieved, it->Event.GetOrbit(), processingTime->SecondsFloat() * 1e3, - it->Event.GetByteSize(), discard); + it->Event.GetByteSize(), !relevant); InFlightLookup.erase(lookupIt); EraseItem(Queues.InFlight, it); @@ -248,7 +254,7 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc } ++*QueueItemsProcessed; - return !discard; + return relevant; } void TBlobStorageQueue::Unwind(ui64 failedMsgId, ui64 failedSequenceId, ui64 expectedMsgId, ui64 expectedSequenceId) { @@ -263,7 +269,7 @@ void TBlobStorageQueue::Unwind(ui64 failedMsgId, ui64 failedSequenceId, ui64 exp const ui32 erased = InFlightLookup.erase(std::make_pair(x->SequenceId, x->MsgId)); Y_VERIFY(erased); cost += x->Cost; // count item's cost - if (x->Discarded) { + if (!x->Event.Relevant()) { if (x == it) { ++it; // advance starting iterator as the item pointed to is being erased } @@ -296,7 +302,7 @@ void TBlobStorageQueue::DrainQueue(NKikimrProto::EReplyStatus status, const TStr auto flushQueue = [&](TItemList& queue) { for (auto it = queue.begin(); it != queue.end(); it = EraseItem(queue, it)) { - if (!it->Discarded) { + if (it->Event.Relevant()) { ReplyWithError(*it, status, errorReason, ctx); } } @@ -328,29 +334,6 @@ TBlobStorageQueue::TItemList::iterator TBlobStorageQueue::EraseItem(TItemList& q return nextIter; } -void TBlobStorageQueue::Prune(const TActorId& sender) { - TSenderMap::TIterator it = SenderToItems.LowerBound(sender); - while (it != SenderToItems.End()) { - TItem& item = static_cast<TItem&>(*it++); - if (item.Event.GetSender() != sender) { - break; - } - switch (item.Queue) { - case EItemQueue::Waiting: - EraseItem(Queues.Waiting, item.Iterator); - break; - - case EItemQueue::InFlight: - item.Discard(); - break; - - default: - Y_FAIL("incorrect item queue state"); - } - ++*QueueItemsPruned; - } -} - TMaybe<TDuration> TBlobStorageQueue::GetWorstRequestProcessingTime() const { if (Queues.InFlight.size()) { return TDuration::Seconds(Queues.InFlight.front().ProcessingTimer.Passed()); diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h index 209346cc2db..996975c9c3d 100644 --- a/ydb/core/blobstorage/backpressure/queue.h +++ b/ydb/core/blobstorage/backpressure/queue.h @@ -51,7 +51,6 @@ class TBlobStorageQueue { const ui64 QueueCookie; ui64 Cost; bool DirtyCost; - bool Discarded; THPTimer ProcessingTimer; TTrackableList<TItem>::iterator Iterator; @@ -71,7 +70,6 @@ class TBlobStorageQueue { , QueueCookie(RandomNumber<ui64>()) , Cost(0) , DirtyCost(true) - , Discarded(false) {} ~TItem() { @@ -81,11 +79,6 @@ class TBlobStorageQueue { ui32 GetByteSize() const { return Event.GetByteSize(); } - - void Discard() { - Discarded = true; - Event.Discard(); - } }; using TItemList = TTrackableList<TItem>; @@ -231,7 +224,6 @@ public: } TItemList::iterator EraseItem(TItemList& queue, TItemList::iterator it); - void Prune(const TActorId& sender); TMaybe<TDuration> GetWorstRequestProcessingTime() const; }; diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index 200b85f616d..d224e047f39 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -417,10 +417,6 @@ private: //////////////////////////////////////////////////////////////////////// // CONTROL SECTOR //////////////////////////////////////////////////////////////////////// - void Handle(TEvPruneQueue::TPtr& ev, const TActorContext& /*ctx*/) { - Queue.Prune(ev->Sender); - } - void HandleWindow(const TActorContext& ctx, const NKikimrBlobStorage::TWindowFeedback& window) { if (window.HasMaxWindowSize()) { const ui64 maxWindowSize = window.GetMaxWindowSize(); @@ -808,7 +804,6 @@ private: XX(TEvBlobStorage::EvVReadyNotify, EvVReadyNotify) \ XX(TEvBlobStorage::EvVStatus, EvVStatus) \ XX(TEvBlobStorage::EvVStatusResult, EvVStatusResult) \ - XX(TEvBlobStorage::EvPruneQueue, EvPruneQueue) \ XX(TEvBlobStorage::EvRequestProxyQueueState, EvRequestProxyQueueState) \ XX(TEvBlobStorage::EvVWindowChange, EvVWindowChange) \ XX(TEvInterconnect::EvNodeConnected, EvNodeConnected) \ @@ -881,7 +876,6 @@ private: HFunc(TEvBlobStorage::TEvVStatus, Handle) HFunc(TEvBlobStorage::TEvVStatusResult, Handle) - HFunc(TEvPruneQueue, Handle) HFunc(TEvRequestProxyQueueState, Handle) HFunc(TEvBlobStorage::TEvVWindowChange, Handle) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 27592e971f9..f206d7ad214 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -315,19 +315,16 @@ public: template<typename T> void SendToQueue(std::unique_ptr<T> event, ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled = false) { + if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) { + event->MessageRelevanceTracker = MessageRelevanceTracker; + } const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, std::move(traceId), timeStatsEnabled); - ++InvolvedQueues[queueId]; ++RequestsInFlight; } template<typename TPtr> - void ProcessReplyFromQueue(const TPtr& ev) { - auto it = InvolvedQueues.find(ev->Sender); - Y_VERIFY(it != InvolvedQueues.end()); - if (!--it->second) { - InvolvedQueues.erase(it); - } + void ProcessReplyFromQueue(const TPtr& /*ev*/) { Y_VERIFY(RequestsInFlight); --RequestsInFlight; CheckPostponedQueue(); @@ -406,9 +403,6 @@ public: Y_VERIFY(!std::exchange(Dead, true)); TDerived::ActiveCounter(Mon)->Dec(); Derived().Send(GetProxyActorId(), new TEvDeathNote(Responsiveness)); - for (const auto& [queueId, numUnrepliedRequests] : InvolvedQueues) { - Derived().Send(queueId, new TEvPruneQueue); - } TActorBootstrapped<TDerived>::PassAway(); } @@ -508,7 +502,7 @@ protected: private: const TActorId Source; const ui64 Cookie; - THashMap<TActorId, ui32> InvolvedQueues; + std::shared_ptr<TMessageRelevanceTracker> MessageRelevanceTracker = std::make_shared<TMessageRelevanceTracker>(); ui32 RequestsInFlight = 0; std::unique_ptr<IEventBase> Response; const TMaybe<TGroupStat::EKind> LatencyQueueKind; diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp index 1f1b9e27be5..f3397ec388b 100644 --- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp @@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } - void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType) + void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType, ui32 alternative = 0) { TEnvironmentSetup env(true, groupType); auto& runtime = env.Runtime; @@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { SendPut(test, originalBlobId3, data, NKikimrProto::OK); finishEventsCount = test.Runtime->GetEventsProcessed(); - UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, eventsCount); + UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, alternative ? alternative : eventsCount); } else if (typeOperation == "get") { TLogoBlobID originalBlobId(tabletId, 1, 0, 0, size, 0); NormalizePredictedDelays(queues); @@ -164,7 +164,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } Y_UNIT_TEST(Put_Mirror3of4) { - CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4); + CountingEventsTest("put", 115, TBlobStorageGroupType::ErasureMirror3of4, 114); } Y_UNIT_TEST(Put_Mirror3dc) { @@ -180,7 +180,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } Y_UNIT_TEST(Get_Mirror3of4) { - CountingEventsTest("get", 38, TBlobStorageGroupType::ErasureMirror3of4); + CountingEventsTest("get", 36, TBlobStorageGroupType::ErasureMirror3of4); } Y_UNIT_TEST(Get_Mirror3dc) { @@ -196,7 +196,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } Y_UNIT_TEST(Collect_Mirror3of4) { - CountingEventsTest("collect", 113, TBlobStorageGroupType::ErasureMirror3of4); + CountingEventsTest("collect", 112, TBlobStorageGroupType::ErasureMirror3of4); } Y_UNIT_TEST(Collect_Mirror3dc) { diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 4aebde7ae0a..bfd260b0040 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -495,8 +495,16 @@ namespace NKikimr { // TEvVPut //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + struct TMessageRelevanceTracker {}; + + struct TEventWithRelevanceTracker { + std::optional<std::weak_ptr<TMessageRelevanceTracker>> MessageRelevanceTracker; + }; + struct TEvBlobStorage::TEvVPut - : public TEventPB<TEvBlobStorage::TEvVPut, NKikimrBlobStorage::TEvVPut, TEvBlobStorage::EvVPut> { + : TEventPB<TEvBlobStorage::TEvVPut, NKikimrBlobStorage::TEvVPut, TEvBlobStorage::EvVPut> + , TEventWithRelevanceTracker + { // In current realization it is intentionaly lost on event serialization since // LWTrace doesn't support distributed shuttels yet mutable NLWTrace::TOrbit Orbit; @@ -762,7 +770,8 @@ namespace NKikimr { }; struct TEvBlobStorage::TEvVMultiPut - : public TEventPB<TEvBlobStorage::TEvVMultiPut, NKikimrBlobStorage::TEvVMultiPut, TEvBlobStorage::EvVMultiPut> + : TEventPB<TEvBlobStorage::TEvVMultiPut, NKikimrBlobStorage::TEvVMultiPut, TEvBlobStorage::EvVMultiPut> + , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; @@ -1023,7 +1032,8 @@ namespace NKikimr { ////////////////////////////////////////////////////////////////////////////////////////////// struct TEvBlobStorage::TEvVGet - : public TEventPB<TEvBlobStorage::TEvVGet, NKikimrBlobStorage::TEvVGet, TEvBlobStorage::EvVGet> + : TEventPB<TEvBlobStorage::TEvVGet, NKikimrBlobStorage::TEvVGet, TEvBlobStorage::EvVGet> + , TEventWithRelevanceTracker { TEvVGet() = default; @@ -1392,7 +1402,9 @@ namespace NKikimr { template <typename TEv, typename TRecord, ui32 EventType> struct TEvVSpecialPatchBase - : public TEventPB<TEv, TRecord, EventType> { + : TEventPB<TEv, TRecord, EventType> + , TEventWithRelevanceTracker + { mutable NLWTrace::TOrbit Orbit; TEvVSpecialPatchBase() = default; @@ -1678,7 +1690,9 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct TEvBlobStorage::TEvVBlock - : public TEventPB<TEvBlobStorage::TEvVBlock, NKikimrBlobStorage::TEvVBlock, TEvBlobStorage::EvVBlock> { + : TEventPB<TEvBlobStorage::TEvVBlock, NKikimrBlobStorage::TEvVBlock, TEvBlobStorage::EvVBlock> + , TEventWithRelevanceTracker + { TEvVBlock() {} @@ -1770,9 +1784,8 @@ namespace NKikimr { ////////////////////////////////////////////////////////////////////////////////////////////// struct TEvBlobStorage::TEvVPatchStart - : public TEventPB<TEvBlobStorage::TEvVPatchStart, - NKikimrBlobStorage::TEvVPatchStart, - TEvBlobStorage::EvVPatchStart> + : TEventPB<TEvBlobStorage::TEvVPatchStart, NKikimrBlobStorage::TEvVPatchStart, TEvBlobStorage::EvVPatchStart> + , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; @@ -1856,9 +1869,8 @@ namespace NKikimr { }; struct TEvBlobStorage::TEvVPatchDiff - : public TEventPB<TEvBlobStorage::TEvVPatchDiff, - NKikimrBlobStorage::TEvVPatchDiff, - TEvBlobStorage::EvVPatchDiff> + : TEventPB<TEvBlobStorage::TEvVPatchDiff, NKikimrBlobStorage::TEvVPatchDiff, TEvBlobStorage::EvVPatchDiff> + , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; @@ -1923,9 +1935,8 @@ namespace NKikimr { struct TEvBlobStorage::TEvVPatchXorDiff - : public TEventPB<TEvBlobStorage::TEvVPatchXorDiff, - NKikimrBlobStorage::TEvVPatchXorDiff, - TEvBlobStorage::EvVPatchXorDiff> + : TEventPB<TEvBlobStorage::TEvVPatchXorDiff, NKikimrBlobStorage::TEvVPatchXorDiff, TEvBlobStorage::EvVPatchXorDiff> + , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; @@ -2063,9 +2074,9 @@ namespace NKikimr { }; struct TEvBlobStorage::TEvVGetBlock - : public TEventPB<TEvBlobStorage::TEvVGetBlock, - NKikimrBlobStorage::TEvVGetBlock, - TEvBlobStorage::EvVGetBlock> { + : TEventPB<TEvBlobStorage::TEvVGetBlock, NKikimrBlobStorage::TEvVGetBlock, TEvBlobStorage::EvVGetBlock> + , TEventWithRelevanceTracker + { TEvVGetBlock() {} @@ -2141,9 +2152,9 @@ namespace NKikimr { struct TEvBlobStorage::TEvVCollectGarbage - : public TEventPB<TEvBlobStorage::TEvVCollectGarbage, - NKikimrBlobStorage::TEvVCollectGarbage, - TEvBlobStorage::EvVCollectGarbage> { + : TEventPB<TEvBlobStorage::TEvVCollectGarbage, NKikimrBlobStorage::TEvVCollectGarbage, TEvBlobStorage::EvVCollectGarbage> + , TEventWithRelevanceTracker + { TEvVCollectGarbage() {} @@ -2267,9 +2278,9 @@ namespace NKikimr { }; struct TEvBlobStorage::TEvVGetBarrier - : public TEventPB<TEvBlobStorage::TEvVGetBarrier, - NKikimrBlobStorage::TEvVGetBarrier, - TEvBlobStorage::EvVGetBarrier> { + : TEventPB<TEvBlobStorage::TEvVGetBarrier, NKikimrBlobStorage::TEvVGetBarrier, TEvBlobStorage::EvVGetBarrier> + , TEventWithRelevanceTracker + { TEvVGetBarrier() {} @@ -2332,9 +2343,7 @@ namespace NKikimr { }; struct TEvBlobStorage::TEvVCheckReadiness - : public TEventPB<TEvBlobStorage::TEvVCheckReadiness, - NKikimrBlobStorage::TEvVCheckReadiness, - TEvBlobStorage::EvVCheckReadiness> + : TEventPB<TEvBlobStorage::TEvVCheckReadiness, NKikimrBlobStorage::TEvVCheckReadiness, TEvBlobStorage::EvVCheckReadiness> { TEvVCheckReadiness() = default; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp index 1f82e552bac..f817732ce9e 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp @@ -128,6 +128,7 @@ namespace NKikimr { ui64 NextSendCookie; ui64 NextReceiveCookie; TResultQueue ResultQueue; + std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>(); TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ; THashMap<ui64, TReplMemTokenId> RequestTokens; @@ -158,6 +159,7 @@ namespace NKikimr { auto req = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskId, deadline, NKikimrBlobStorage::EGetHandleClass::AsyncRead, TEvBlobStorage::TEvVGet::EFlags::None, getCookie); + req->MessageRelevanceTracker = Tracker; ui64 maxResponseSize = ReplCtx->VDiskCfg->ReplMaxResponseSize; if (const auto& quoter = ReplCtx->VCtx->ReplNodeRequestQuoter) { @@ -225,7 +227,6 @@ namespace NKikimr { PrefetchDataSize = 0; RequestFromVDiskProxyPending = false; if (Finished) { - Send(ServiceId, new TEvPruneQueue); Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue); RequestTokens.clear(); return PassAway(); // TODO(alexvru): check correctness of invocations |