diff options
author | snaury <snaury@ydb.tech> | 2023-04-05 12:07:13 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-04-05 12:07:13 +0300 |
commit | 62313317a2f104c74ea46ba84bfb6c8c9dac24f6 (patch) | |
tree | 376cb03c5cb9676c274fa504993000f2106d314d | |
parent | 0a18b03b7d23f8efa1f404296ddda081d2e6693a (diff) | |
download | ydb-62313317a2f104c74ea46ba84bfb6c8c9dac24f6.tar.gz |
Prefer using the same pipe after subscription in pipe cache
-rw-r--r-- | ydb/core/base/tablet_pipecache.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet/tablet_pipecache.cpp | 35 |
2 files changed, 31 insertions, 8 deletions
diff --git a/ydb/core/base/tablet_pipecache.h b/ydb/core/base/tablet_pipecache.h index ce56815fcff..34f48cf2ccb 100644 --- a/ydb/core/base/tablet_pipecache.h +++ b/ydb/core/base/tablet_pipecache.h @@ -29,11 +29,13 @@ struct TEvPipeCache { THolder<IEventBase> Ev; const ui64 TabletId; const bool Subscribe; + const ui64 SubscribeCookie; - TEvForward(IEventBase *ev, ui64 tabletId, bool subscribe = true) + TEvForward(IEventBase *ev, ui64 tabletId, bool subscribe = true, ui64 subscribeCookie = 0) : Ev(ev) , TabletId(tabletId) , Subscribe(subscribe) + , SubscribeCookie(subscribeCookie) {} }; diff --git a/ydb/core/tablet/tablet_pipecache.cpp b/ydb/core/tablet/tablet_pipecache.cpp index b21c726d22a..8c5953f9db2 100644 --- a/ydb/core/tablet/tablet_pipecache.cpp +++ b/ydb/core/tablet/tablet_pipecache.cpp @@ -57,9 +57,14 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { { } }; + struct TClientSubscription { + ui64 SeqNo; + ui64 Cookie; + }; + struct TClientState { TActorId Client; - THashMap<TActorId, ui64> Peers; + THashMap<TActorId, TClientSubscription> Peers; ui64 LastSentSeqNo = 0; ui64 MaxForwardedSeqNo = Max<ui64>(); TVector<TNodeRequest> NodeRequests; @@ -165,9 +170,10 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { for (auto &kv : clientState->Peers) { const auto &peer = kv.first; - const ui64 seqNo = kv.second; + const ui64 seqNo = kv.second.SeqNo; + const ui64 cookie = kv.second.Cookie; const bool msgNotDelivered = notDelivered || seqNo > clientState->MaxForwardedSeqNo; - Send(peer, new TEvPipeCache::TEvDeliveryProblem(tablet, msgNotDelivered)); + Send(peer, new TEvPipeCache::TEvDeliveryProblem(tablet, msgNotDelivered), 0, cookie); tabletState->ByPeer.erase(peer); @@ -286,12 +292,27 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { TEvPipeCache::TEvForward *msg = ev->Get(); const ui64 tablet = msg->TabletId; const bool subscribe = msg->Subscribe; + const ui64 subscribeCookie = msg->SubscribeCookie; const TActorId peer = ev->Sender; const ui64 cookie = ev->Cookie; NWilson::TTraceId traceId = std::move(ev->TraceId); auto *tabletState = EnsureTablet(tablet); - auto *clientState = EnsureClient(tabletState, tablet); + + TClientState *clientState = nullptr; + + // Prefer using the same pipe after subscription + if (!subscribe) { + auto it = tabletState->ByPeer.find(peer); + if (it != tabletState->ByPeer.end()) { + clientState = it->second; + } + } + + // Ensure there's a valid pipe for sending messages + if (!clientState) { + clientState = EnsureClient(tabletState, tablet); + } if (subscribe) { TClientState *&link = tabletState->ByPeer[peer]; @@ -319,10 +340,10 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { link = clientState; } const ui64 seqNo = ++clientState->LastSentSeqNo; - clientState->Peers[peer] = seqNo; - NTabletPipe::SendDataWithSeqNo(peer, tabletState->LastClient, msg->Ev.Release(), seqNo, cookie, std::move(traceId)); + clientState->Peers[peer] = { seqNo, subscribeCookie }; + NTabletPipe::SendDataWithSeqNo(peer, clientState->Client, msg->Ev.Release(), seqNo, cookie, std::move(traceId)); } else { - NTabletPipe::SendData(peer, tabletState->LastClient, msg->Ev.Release(), cookie, std::move(traceId)); + NTabletPipe::SendData(peer, clientState->Client, msg->Ev.Release(), cookie, std::move(traceId)); } } |