aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-03-22 00:10:48 +0300
committerAlexander Rutkovsky <alexvru@mail.ru>2022-03-22 00:10:48 +0300
commit06da0639afb2fcf3a54ba3a29061c703e2c387e4 (patch)
treec2d003c41232c9e57e2e6e6e68d454bde56aa022
parent74dae97bf191600137c2c4907f5b74fc42799e58 (diff)
downloadydb-06da0639afb2fcf3a54ba3a29061c703e2c387e4.tar.gz
Alternative way of queue pruning in DS proxy KIKIMR-8102
ref:92876c6be03ecd3a97971ac1643444f56e91d002
-rw-r--r--ydb/core/blobstorage/backpressure/event.h6
-rw-r--r--ydb/core/blobstorage/backpressure/queue.cpp39
-rw-r--r--ydb/core/blobstorage/backpressure/queue.h8
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h16
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/counting_events.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h61
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp3
8 files changed, 64 insertions, 85 deletions
diff --git a/ydb/core/blobstorage/backpressure/event.h b/ydb/core/blobstorage/backpressure/event.h
index 7b8ea72d4c2..e5f5ff1531b 100644
--- a/ydb/core/blobstorage/backpressure/event.h
+++ b/ydb/core/blobstorage/backpressure/event.h
@@ -40,6 +40,7 @@ class TEventHolder {
TIntrusivePtr<TEventSerializedData> Buffer;
TBSProxyContextPtr BSProxyCtx;
std::unique_ptr<IEventBase> LocalEvent;
+ std::optional<std::weak_ptr<TMessageRelevanceTracker>> Tracker;
public:
TEventHolder()
@@ -59,6 +60,7 @@ public:
, InterconnectChannel(interconnectChannel)
, Orbit(MoveOrbit(ev))
, BSProxyCtx(bspctx)
+ , Tracker(std::move(ev->Get()->MessageRelevanceTracker))
{
// trace the event
if constexpr (std::is_same_v<TPtr, TEvBlobStorage::TEvVPut::TPtr>) {
@@ -90,6 +92,10 @@ public:
Discard();
}
+ bool Relevant() const {
+ return !Tracker || !Tracker->expired();
+ }
+
ui32 GetByteSize() const {
return ByteSize;
}
diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp
index c573b7b16b4..7e9622334d5 100644
--- a/ydb/core/blobstorage/backpressure/queue.cpp
+++ b/ydb/core/blobstorage/backpressure/queue.cpp
@@ -131,6 +131,12 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re
continue;
}
+ if (!item.Event.Relevant()) {
+ ++*QueueItemsPruned;
+ it = EraseItem(Queues.Waiting, it);
+ continue;
+ }
+
// update item parameters
item.MsgId = NextMsgId;
item.SequenceId = CurrentSequenceId;
@@ -231,13 +237,13 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc
Y_VERIFY(InFlightCost >= it->Cost);
InFlightCost -= it->Cost;
- const bool discard = it->Discarded;
+ const bool relevant = it->Event.Relevant();
*outSender = it->Event.GetSender();
*outCookie = it->Event.GetCookie();
*processingTime = TDuration::Seconds(it->ProcessingTimer.Passed());
LWTRACK(DSQueueVPutResultRecieved, it->Event.GetOrbit(), processingTime->SecondsFloat() * 1e3,
- it->Event.GetByteSize(), discard);
+ it->Event.GetByteSize(), !relevant);
InFlightLookup.erase(lookupIt);
EraseItem(Queues.InFlight, it);
@@ -248,7 +254,7 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc
}
++*QueueItemsProcessed;
- return !discard;
+ return relevant;
}
void TBlobStorageQueue::Unwind(ui64 failedMsgId, ui64 failedSequenceId, ui64 expectedMsgId, ui64 expectedSequenceId) {
@@ -263,7 +269,7 @@ void TBlobStorageQueue::Unwind(ui64 failedMsgId, ui64 failedSequenceId, ui64 exp
const ui32 erased = InFlightLookup.erase(std::make_pair(x->SequenceId, x->MsgId));
Y_VERIFY(erased);
cost += x->Cost; // count item's cost
- if (x->Discarded) {
+ if (!x->Event.Relevant()) {
if (x == it) {
++it; // advance starting iterator as the item pointed to is being erased
}
@@ -296,7 +302,7 @@ void TBlobStorageQueue::DrainQueue(NKikimrProto::EReplyStatus status, const TStr
auto flushQueue = [&](TItemList& queue) {
for (auto it = queue.begin(); it != queue.end(); it = EraseItem(queue, it)) {
- if (!it->Discarded) {
+ if (it->Event.Relevant()) {
ReplyWithError(*it, status, errorReason, ctx);
}
}
@@ -328,29 +334,6 @@ TBlobStorageQueue::TItemList::iterator TBlobStorageQueue::EraseItem(TItemList& q
return nextIter;
}
-void TBlobStorageQueue::Prune(const TActorId& sender) {
- TSenderMap::TIterator it = SenderToItems.LowerBound(sender);
- while (it != SenderToItems.End()) {
- TItem& item = static_cast<TItem&>(*it++);
- if (item.Event.GetSender() != sender) {
- break;
- }
- switch (item.Queue) {
- case EItemQueue::Waiting:
- EraseItem(Queues.Waiting, item.Iterator);
- break;
-
- case EItemQueue::InFlight:
- item.Discard();
- break;
-
- default:
- Y_FAIL("incorrect item queue state");
- }
- ++*QueueItemsPruned;
- }
-}
-
TMaybe<TDuration> TBlobStorageQueue::GetWorstRequestProcessingTime() const {
if (Queues.InFlight.size()) {
return TDuration::Seconds(Queues.InFlight.front().ProcessingTimer.Passed());
diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h
index 209346cc2db..996975c9c3d 100644
--- a/ydb/core/blobstorage/backpressure/queue.h
+++ b/ydb/core/blobstorage/backpressure/queue.h
@@ -51,7 +51,6 @@ class TBlobStorageQueue {
const ui64 QueueCookie;
ui64 Cost;
bool DirtyCost;
- bool Discarded;
THPTimer ProcessingTimer;
TTrackableList<TItem>::iterator Iterator;
@@ -71,7 +70,6 @@ class TBlobStorageQueue {
, QueueCookie(RandomNumber<ui64>())
, Cost(0)
, DirtyCost(true)
- , Discarded(false)
{}
~TItem() {
@@ -81,11 +79,6 @@ class TBlobStorageQueue {
ui32 GetByteSize() const {
return Event.GetByteSize();
}
-
- void Discard() {
- Discarded = true;
- Event.Discard();
- }
};
using TItemList = TTrackableList<TItem>;
@@ -231,7 +224,6 @@ public:
}
TItemList::iterator EraseItem(TItemList& queue, TItemList::iterator it);
- void Prune(const TActorId& sender);
TMaybe<TDuration> GetWorstRequestProcessingTime() const;
};
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index 200b85f616d..d224e047f39 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -417,10 +417,6 @@ private:
////////////////////////////////////////////////////////////////////////
// CONTROL SECTOR
////////////////////////////////////////////////////////////////////////
- void Handle(TEvPruneQueue::TPtr& ev, const TActorContext& /*ctx*/) {
- Queue.Prune(ev->Sender);
- }
-
void HandleWindow(const TActorContext& ctx, const NKikimrBlobStorage::TWindowFeedback& window) {
if (window.HasMaxWindowSize()) {
const ui64 maxWindowSize = window.GetMaxWindowSize();
@@ -808,7 +804,6 @@ private:
XX(TEvBlobStorage::EvVReadyNotify, EvVReadyNotify) \
XX(TEvBlobStorage::EvVStatus, EvVStatus) \
XX(TEvBlobStorage::EvVStatusResult, EvVStatusResult) \
- XX(TEvBlobStorage::EvPruneQueue, EvPruneQueue) \
XX(TEvBlobStorage::EvRequestProxyQueueState, EvRequestProxyQueueState) \
XX(TEvBlobStorage::EvVWindowChange, EvVWindowChange) \
XX(TEvInterconnect::EvNodeConnected, EvNodeConnected) \
@@ -881,7 +876,6 @@ private:
HFunc(TEvBlobStorage::TEvVStatus, Handle)
HFunc(TEvBlobStorage::TEvVStatusResult, Handle)
- HFunc(TEvPruneQueue, Handle)
HFunc(TEvRequestProxyQueueState, Handle)
HFunc(TEvBlobStorage::TEvVWindowChange, Handle)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index 27592e971f9..f206d7ad214 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -315,19 +315,16 @@ public:
template<typename T>
void SendToQueue(std::unique_ptr<T> event, ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled = false) {
+ if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) {
+ event->MessageRelevanceTracker = MessageRelevanceTracker;
+ }
const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, std::move(traceId),
timeStatsEnabled);
- ++InvolvedQueues[queueId];
++RequestsInFlight;
}
template<typename TPtr>
- void ProcessReplyFromQueue(const TPtr& ev) {
- auto it = InvolvedQueues.find(ev->Sender);
- Y_VERIFY(it != InvolvedQueues.end());
- if (!--it->second) {
- InvolvedQueues.erase(it);
- }
+ void ProcessReplyFromQueue(const TPtr& /*ev*/) {
Y_VERIFY(RequestsInFlight);
--RequestsInFlight;
CheckPostponedQueue();
@@ -406,9 +403,6 @@ public:
Y_VERIFY(!std::exchange(Dead, true));
TDerived::ActiveCounter(Mon)->Dec();
Derived().Send(GetProxyActorId(), new TEvDeathNote(Responsiveness));
- for (const auto& [queueId, numUnrepliedRequests] : InvolvedQueues) {
- Derived().Send(queueId, new TEvPruneQueue);
- }
TActorBootstrapped<TDerived>::PassAway();
}
@@ -508,7 +502,7 @@ protected:
private:
const TActorId Source;
const ui64 Cookie;
- THashMap<TActorId, ui32> InvolvedQueues;
+ std::shared_ptr<TMessageRelevanceTracker> MessageRelevanceTracker = std::make_shared<TMessageRelevanceTracker>();
ui32 RequestsInFlight = 0;
std::unique_ptr<IEventBase> Response;
const TMaybe<TGroupStat::EKind> LatencyQueueKind;
diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
index 1f1b9e27be5..f3397ec388b 100644
--- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
@@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
- void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType)
+ void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType, ui32 alternative = 0)
{
TEnvironmentSetup env(true, groupType);
auto& runtime = env.Runtime;
@@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
SendPut(test, originalBlobId3, data, NKikimrProto::OK);
finishEventsCount = test.Runtime->GetEventsProcessed();
- UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, eventsCount);
+ UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, alternative ? alternative : eventsCount);
} else if (typeOperation == "get") {
TLogoBlobID originalBlobId(tabletId, 1, 0, 0, size, 0);
NormalizePredictedDelays(queues);
@@ -164,7 +164,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
Y_UNIT_TEST(Put_Mirror3of4) {
- CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4);
+ CountingEventsTest("put", 115, TBlobStorageGroupType::ErasureMirror3of4, 114);
}
Y_UNIT_TEST(Put_Mirror3dc) {
@@ -180,7 +180,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
Y_UNIT_TEST(Get_Mirror3of4) {
- CountingEventsTest("get", 38, TBlobStorageGroupType::ErasureMirror3of4);
+ CountingEventsTest("get", 36, TBlobStorageGroupType::ErasureMirror3of4);
}
Y_UNIT_TEST(Get_Mirror3dc) {
@@ -196,7 +196,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
Y_UNIT_TEST(Collect_Mirror3of4) {
- CountingEventsTest("collect", 113, TBlobStorageGroupType::ErasureMirror3of4);
+ CountingEventsTest("collect", 112, TBlobStorageGroupType::ErasureMirror3of4);
}
Y_UNIT_TEST(Collect_Mirror3dc) {
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 4aebde7ae0a..bfd260b0040 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -495,8 +495,16 @@ namespace NKikimr {
// TEvVPut
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ struct TMessageRelevanceTracker {};
+
+ struct TEventWithRelevanceTracker {
+ std::optional<std::weak_ptr<TMessageRelevanceTracker>> MessageRelevanceTracker;
+ };
+
struct TEvBlobStorage::TEvVPut
- : public TEventPB<TEvBlobStorage::TEvVPut, NKikimrBlobStorage::TEvVPut, TEvBlobStorage::EvVPut> {
+ : TEventPB<TEvBlobStorage::TEvVPut, NKikimrBlobStorage::TEvVPut, TEvBlobStorage::EvVPut>
+ , TEventWithRelevanceTracker
+ {
// In current realization it is intentionaly lost on event serialization since
// LWTrace doesn't support distributed shuttels yet
mutable NLWTrace::TOrbit Orbit;
@@ -762,7 +770,8 @@ namespace NKikimr {
};
struct TEvBlobStorage::TEvVMultiPut
- : public TEventPB<TEvBlobStorage::TEvVMultiPut, NKikimrBlobStorage::TEvVMultiPut, TEvBlobStorage::EvVMultiPut>
+ : TEventPB<TEvBlobStorage::TEvVMultiPut, NKikimrBlobStorage::TEvVMultiPut, TEvBlobStorage::EvVMultiPut>
+ , TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
@@ -1023,7 +1032,8 @@ namespace NKikimr {
//////////////////////////////////////////////////////////////////////////////////////////////
struct TEvBlobStorage::TEvVGet
- : public TEventPB<TEvBlobStorage::TEvVGet, NKikimrBlobStorage::TEvVGet, TEvBlobStorage::EvVGet>
+ : TEventPB<TEvBlobStorage::TEvVGet, NKikimrBlobStorage::TEvVGet, TEvBlobStorage::EvVGet>
+ , TEventWithRelevanceTracker
{
TEvVGet() = default;
@@ -1392,7 +1402,9 @@ namespace NKikimr {
template <typename TEv, typename TRecord, ui32 EventType>
struct TEvVSpecialPatchBase
- : public TEventPB<TEv, TRecord, EventType> {
+ : TEventPB<TEv, TRecord, EventType>
+ , TEventWithRelevanceTracker
+ {
mutable NLWTrace::TOrbit Orbit;
TEvVSpecialPatchBase() = default;
@@ -1678,7 +1690,9 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct TEvBlobStorage::TEvVBlock
- : public TEventPB<TEvBlobStorage::TEvVBlock, NKikimrBlobStorage::TEvVBlock, TEvBlobStorage::EvVBlock> {
+ : TEventPB<TEvBlobStorage::TEvVBlock, NKikimrBlobStorage::TEvVBlock, TEvBlobStorage::EvVBlock>
+ , TEventWithRelevanceTracker
+ {
TEvVBlock()
{}
@@ -1770,9 +1784,8 @@ namespace NKikimr {
//////////////////////////////////////////////////////////////////////////////////////////////
struct TEvBlobStorage::TEvVPatchStart
- : public TEventPB<TEvBlobStorage::TEvVPatchStart,
- NKikimrBlobStorage::TEvVPatchStart,
- TEvBlobStorage::EvVPatchStart>
+ : TEventPB<TEvBlobStorage::TEvVPatchStart, NKikimrBlobStorage::TEvVPatchStart, TEvBlobStorage::EvVPatchStart>
+ , TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
@@ -1856,9 +1869,8 @@ namespace NKikimr {
};
struct TEvBlobStorage::TEvVPatchDiff
- : public TEventPB<TEvBlobStorage::TEvVPatchDiff,
- NKikimrBlobStorage::TEvVPatchDiff,
- TEvBlobStorage::EvVPatchDiff>
+ : TEventPB<TEvBlobStorage::TEvVPatchDiff, NKikimrBlobStorage::TEvVPatchDiff, TEvBlobStorage::EvVPatchDiff>
+ , TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
@@ -1923,9 +1935,8 @@ namespace NKikimr {
struct TEvBlobStorage::TEvVPatchXorDiff
- : public TEventPB<TEvBlobStorage::TEvVPatchXorDiff,
- NKikimrBlobStorage::TEvVPatchXorDiff,
- TEvBlobStorage::EvVPatchXorDiff>
+ : TEventPB<TEvBlobStorage::TEvVPatchXorDiff, NKikimrBlobStorage::TEvVPatchXorDiff, TEvBlobStorage::EvVPatchXorDiff>
+ , TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
@@ -2063,9 +2074,9 @@ namespace NKikimr {
};
struct TEvBlobStorage::TEvVGetBlock
- : public TEventPB<TEvBlobStorage::TEvVGetBlock,
- NKikimrBlobStorage::TEvVGetBlock,
- TEvBlobStorage::EvVGetBlock> {
+ : TEventPB<TEvBlobStorage::TEvVGetBlock, NKikimrBlobStorage::TEvVGetBlock, TEvBlobStorage::EvVGetBlock>
+ , TEventWithRelevanceTracker
+ {
TEvVGetBlock()
{}
@@ -2141,9 +2152,9 @@ namespace NKikimr {
struct TEvBlobStorage::TEvVCollectGarbage
- : public TEventPB<TEvBlobStorage::TEvVCollectGarbage,
- NKikimrBlobStorage::TEvVCollectGarbage,
- TEvBlobStorage::EvVCollectGarbage> {
+ : TEventPB<TEvBlobStorage::TEvVCollectGarbage, NKikimrBlobStorage::TEvVCollectGarbage, TEvBlobStorage::EvVCollectGarbage>
+ , TEventWithRelevanceTracker
+ {
TEvVCollectGarbage()
{}
@@ -2267,9 +2278,9 @@ namespace NKikimr {
};
struct TEvBlobStorage::TEvVGetBarrier
- : public TEventPB<TEvBlobStorage::TEvVGetBarrier,
- NKikimrBlobStorage::TEvVGetBarrier,
- TEvBlobStorage::EvVGetBarrier> {
+ : TEventPB<TEvBlobStorage::TEvVGetBarrier, NKikimrBlobStorage::TEvVGetBarrier, TEvBlobStorage::EvVGetBarrier>
+ , TEventWithRelevanceTracker
+ {
TEvVGetBarrier()
{}
@@ -2332,9 +2343,7 @@ namespace NKikimr {
};
struct TEvBlobStorage::TEvVCheckReadiness
- : public TEventPB<TEvBlobStorage::TEvVCheckReadiness,
- NKikimrBlobStorage::TEvVCheckReadiness,
- TEvBlobStorage::EvVCheckReadiness>
+ : TEventPB<TEvBlobStorage::TEvVCheckReadiness, NKikimrBlobStorage::TEvVCheckReadiness, TEvBlobStorage::EvVCheckReadiness>
{
TEvVCheckReadiness() = default;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
index 1f82e552bac..f817732ce9e 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
@@ -128,6 +128,7 @@ namespace NKikimr {
ui64 NextSendCookie;
ui64 NextReceiveCookie;
TResultQueue ResultQueue;
+ std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
THashMap<ui64, TReplMemTokenId> RequestTokens;
@@ -158,6 +159,7 @@ namespace NKikimr {
auto req = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskId, deadline,
NKikimrBlobStorage::EGetHandleClass::AsyncRead, TEvBlobStorage::TEvVGet::EFlags::None,
getCookie);
+ req->MessageRelevanceTracker = Tracker;
ui64 maxResponseSize = ReplCtx->VDiskCfg->ReplMaxResponseSize;
if (const auto& quoter = ReplCtx->VCtx->ReplNodeRequestQuoter) {
@@ -225,7 +227,6 @@ namespace NKikimr {
PrefetchDataSize = 0;
RequestFromVDiskProxyPending = false;
if (Finished) {
- Send(ServiceId, new TEvPruneQueue);
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
RequestTokens.clear();
return PassAway(); // TODO(alexvru): check correctness of invocations