aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-07-01 16:14:44 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-07-01 16:14:44 +0300
commitad7b9a14241e52ad1aa3be31c56198921620296e (patch)
treeafa61dd8eb91f04b7f5665ee7f971fa5ad270468
parentb0671af02d9caea8d8752c91d177098ee99e8af1 (diff)
downloadydb-ad7b9a14241e52ad1aa3be31c56198921620296e.tar.gz
Some cleanup in the way datashard works with pipes, KIKIMR-15240
ref:4fb13d00076db089a12749f2d787f3784cb41c68
-rw-r--r--ydb/core/tablet/tablet_pipe_client_cache.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard.cpp24
2 files changed, 21 insertions, 9 deletions
diff --git a/ydb/core/tablet/tablet_pipe_client_cache.cpp b/ydb/core/tablet/tablet_pipe_client_cache.cpp
index 47b81e45681..2f8cdedaf25 100644
--- a/ydb/core/tablet/tablet_pipe_client_cache.cpp
+++ b/ydb/core/tablet/tablet_pipe_client_cache.cpp
@@ -119,13 +119,15 @@ namespace NTabletPipe {
}
bool OnConnect(TEvTabletPipe::TEvClientConnected::TPtr& ev) override {
- if (!ev->Get()->ServerId) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
Erase(ev->Get()->TabletId, ev->Get()->ClientId);
return false;
}
TClientCacheEntry* currentClient;
- if (Container->Find(ev->Get()->TabletId, currentClient)) {
+ if (Container->Find(ev->Get()->TabletId, currentClient) &&
+ currentClient->Client == ev->Get()->ClientId)
+ {
currentClient->Flags |= TClientCacheEntry::Opened;
if (currentClient->Flags & TClientCacheEntry::ShutdownRequested) {
currentClient->Flags &= ~TClientCacheEntry::ShutdownRequested;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 2e64b818bc0..f642a47d8c8 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2254,23 +2254,27 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo
return;
}
- if (ev->Get()->Status != NKikimrProto::OK) {
- if (ev->Get()->ClientId == StateReportPipe) {
+ if (ev->Get()->ClientId == StateReportPipe) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
StateReportPipe = TActorId();
ReportState(ctx, State);
- return;
}
+ return;
+ }
- if (ev->Get()->ClientId == DbStatsReportPipe) {
+ if (ev->Get()->ClientId == DbStatsReportPipe) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
DbStatsReportPipe = TActorId();
- return;
}
+ return;
+ }
- if (ev->Get()->ClientId == TableResolvePipe) {
+ if (ev->Get()->ClientId == TableResolvePipe) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
TableResolvePipe = TActorId();
ResolveTablePath(ctx);
- return;
}
+ return;
}
if (LoanReturnTracker.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
@@ -2285,6 +2289,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo
LoanReturnTracker.AutoAckLoans(ev->Get()->TabletId, ctx);
}
}
+ return;
}
// Resend split-related messages in needed
@@ -2292,6 +2297,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo
if (ev->Get()->Status != NKikimrProto::OK) {
SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx);
}
+ return;
}
if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
@@ -2302,6 +2308,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo
ChangeSenderActivator.AutoAck(ev->Get()->TabletId, ctx);
}
}
+ return;
}
if (!PipeClientCache->OnConnect(ev)) {
@@ -2343,15 +2350,18 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActo
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Resending loan returns from " << TabletID() << " to " << ev->Get()->TabletId);
LoanReturnTracker.ResendLoans(ev->Get()->TabletId, ctx);
+ return;
}
// Resend split-related messages in needed
if (SplitSrcSnapshotSender.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx);
+ return;
}
if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
ChangeSenderActivator.DoSend(ev->Get()->TabletId, ctx);
+ return;
}
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Client pipe to tablet %" PRIu64 " from %" PRIu64 " is reset", ev->Get()->TabletId, TabletID());