diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-07-01 16:14:44 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-07-01 16:14:44 +0300 |
commit | ad7b9a14241e52ad1aa3be31c56198921620296e (patch) | |
tree | afa61dd8eb91f04b7f5665ee7f971fa5ad270468 | |
parent | b0671af02d9caea8d8752c91d177098ee99e8af1 (diff) | |
download | ydb-ad7b9a14241e52ad1aa3be31c56198921620296e.tar.gz |
Some cleanup in the way datashard works with pipes, KIKIMR-15240
ref:4fb13d00076db089a12749f2d787f3784cb41c68
-rw-r--r-- | ydb/core/tablet/tablet_pipe_client_cache.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 24 |
2 files changed, 21 insertions, 9 deletions
diff --git a/ydb/core/tablet/tablet_pipe_client_cache.cpp b/ydb/core/tablet/tablet_pipe_client_cache.cpp index 47b81e45681..2f8cdedaf25 100644 --- a/ydb/core/tablet/tablet_pipe_client_cache.cpp +++ b/ydb/core/tablet/tablet_pipe_client_cache.cpp @@ -119,13 +119,15 @@ namespace NTabletPipe { } bool OnConnect(TEvTabletPipe::TEvClientConnected::TPtr& ev) override { - if (!ev->Get()->ServerId) { + if (ev->Get()->Status != NKikimrProto::OK) { Erase(ev->Get()->TabletId, ev->Get()->ClientId); return false; } TClientCacheEntry* currentClient; - if (Container->Find(ev->Get()->TabletId, currentClient)) { + if (Container->Find(ev->Get()->TabletId, currentClient) && + currentClient->Client == ev->Get()->ClientId) + { currentClient->Flags |= TClientCacheEntry::Opened; if (currentClient->Flags & TClientCacheEntry::ShutdownRequested) { currentClient->Flags &= ~TClientCacheEntry::ShutdownRequested; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 2e64b818bc0..f642a47d8c8 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2254,23 +2254,27 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo return; } - if (ev->Get()->Status != NKikimrProto::OK) { - if (ev->Get()->ClientId == StateReportPipe) { + if (ev->Get()->ClientId == StateReportPipe) { + if (ev->Get()->Status != NKikimrProto::OK) { StateReportPipe = TActorId(); ReportState(ctx, State); - return; } + return; + } - if (ev->Get()->ClientId == DbStatsReportPipe) { + if (ev->Get()->ClientId == DbStatsReportPipe) { + if (ev->Get()->Status != NKikimrProto::OK) { DbStatsReportPipe = TActorId(); - return; } + return; + } - if (ev->Get()->ClientId == TableResolvePipe) { + if (ev->Get()->ClientId == TableResolvePipe) { + if (ev->Get()->Status != NKikimrProto::OK) { TableResolvePipe = TActorId(); ResolveTablePath(ctx); - return; } + return; } if (LoanReturnTracker.Has(ev->Get()->TabletId, ev->Get()->ClientId)) { @@ -2285,6 +2289,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo LoanReturnTracker.AutoAckLoans(ev->Get()->TabletId, ctx); } } + return; } // Resend split-related messages in needed @@ -2292,6 +2297,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo if (ev->Get()->Status != NKikimrProto::OK) { SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx); } + return; } if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) { @@ -2302,6 +2308,7 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActo ChangeSenderActivator.AutoAck(ev->Get()->TabletId, ctx); } } + return; } if (!PipeClientCache->OnConnect(ev)) { @@ -2343,15 +2350,18 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActo LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Resending loan returns from " << TabletID() << " to " << ev->Get()->TabletId); LoanReturnTracker.ResendLoans(ev->Get()->TabletId, ctx); + return; } // Resend split-related messages in needed if (SplitSrcSnapshotSender.Has(ev->Get()->TabletId, ev->Get()->ClientId)) { SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx); + return; } if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) { ChangeSenderActivator.DoSend(ev->Get()->TabletId, ctx); + return; } LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Client pipe to tablet %" PRIu64 " from %" PRIu64 " is reset", ev->Get()->TabletId, TabletID()); |