aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-04-05 12:07:13 +0300
committersnaury <snaury@ydb.tech>2023-04-05 12:07:13 +0300
commit62313317a2f104c74ea46ba84bfb6c8c9dac24f6 (patch)
tree376cb03c5cb9676c274fa504993000f2106d314d
parent0a18b03b7d23f8efa1f404296ddda081d2e6693a (diff)
downloadydb-62313317a2f104c74ea46ba84bfb6c8c9dac24f6.tar.gz
Prefer using the same pipe after subscription in pipe cache
-rw-r--r--ydb/core/base/tablet_pipecache.h4
-rw-r--r--ydb/core/tablet/tablet_pipecache.cpp35
2 files changed, 31 insertions, 8 deletions
diff --git a/ydb/core/base/tablet_pipecache.h b/ydb/core/base/tablet_pipecache.h
index ce56815fcff..34f48cf2ccb 100644
--- a/ydb/core/base/tablet_pipecache.h
+++ b/ydb/core/base/tablet_pipecache.h
@@ -29,11 +29,13 @@ struct TEvPipeCache {
THolder<IEventBase> Ev;
const ui64 TabletId;
const bool Subscribe;
+ const ui64 SubscribeCookie;
- TEvForward(IEventBase *ev, ui64 tabletId, bool subscribe = true)
+ TEvForward(IEventBase *ev, ui64 tabletId, bool subscribe = true, ui64 subscribeCookie = 0)
: Ev(ev)
, TabletId(tabletId)
, Subscribe(subscribe)
+ , SubscribeCookie(subscribeCookie)
{}
};
diff --git a/ydb/core/tablet/tablet_pipecache.cpp b/ydb/core/tablet/tablet_pipecache.cpp
index b21c726d22a..8c5953f9db2 100644
--- a/ydb/core/tablet/tablet_pipecache.cpp
+++ b/ydb/core/tablet/tablet_pipecache.cpp
@@ -57,9 +57,14 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
{ }
};
+ struct TClientSubscription {
+ ui64 SeqNo;
+ ui64 Cookie;
+ };
+
struct TClientState {
TActorId Client;
- THashMap<TActorId, ui64> Peers;
+ THashMap<TActorId, TClientSubscription> Peers;
ui64 LastSentSeqNo = 0;
ui64 MaxForwardedSeqNo = Max<ui64>();
TVector<TNodeRequest> NodeRequests;
@@ -165,9 +170,10 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
for (auto &kv : clientState->Peers) {
const auto &peer = kv.first;
- const ui64 seqNo = kv.second;
+ const ui64 seqNo = kv.second.SeqNo;
+ const ui64 cookie = kv.second.Cookie;
const bool msgNotDelivered = notDelivered || seqNo > clientState->MaxForwardedSeqNo;
- Send(peer, new TEvPipeCache::TEvDeliveryProblem(tablet, msgNotDelivered));
+ Send(peer, new TEvPipeCache::TEvDeliveryProblem(tablet, msgNotDelivered), 0, cookie);
tabletState->ByPeer.erase(peer);
@@ -286,12 +292,27 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
TEvPipeCache::TEvForward *msg = ev->Get();
const ui64 tablet = msg->TabletId;
const bool subscribe = msg->Subscribe;
+ const ui64 subscribeCookie = msg->SubscribeCookie;
const TActorId peer = ev->Sender;
const ui64 cookie = ev->Cookie;
NWilson::TTraceId traceId = std::move(ev->TraceId);
auto *tabletState = EnsureTablet(tablet);
- auto *clientState = EnsureClient(tabletState, tablet);
+
+ TClientState *clientState = nullptr;
+
+ // Prefer using the same pipe after subscription
+ if (!subscribe) {
+ auto it = tabletState->ByPeer.find(peer);
+ if (it != tabletState->ByPeer.end()) {
+ clientState = it->second;
+ }
+ }
+
+ // Ensure there's a valid pipe for sending messages
+ if (!clientState) {
+ clientState = EnsureClient(tabletState, tablet);
+ }
if (subscribe) {
TClientState *&link = tabletState->ByPeer[peer];
@@ -319,10 +340,10 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
link = clientState;
}
const ui64 seqNo = ++clientState->LastSentSeqNo;
- clientState->Peers[peer] = seqNo;
- NTabletPipe::SendDataWithSeqNo(peer, tabletState->LastClient, msg->Ev.Release(), seqNo, cookie, std::move(traceId));
+ clientState->Peers[peer] = { seqNo, subscribeCookie };
+ NTabletPipe::SendDataWithSeqNo(peer, clientState->Client, msg->Ev.Release(), seqNo, cookie, std::move(traceId));
} else {
- NTabletPipe::SendData(peer, tabletState->LastClient, msg->Ev.Release(), cookie, std::move(traceId));
+ NTabletPipe::SendData(peer, clientState->Client, msg->Ev.Release(), cookie, std::move(traceId));
}
}