diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-09-12 21:55:43 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-09-12 22:13:23 +0300 |
commit | 3557093c22e5182620d0f182d0172599ba29771e (patch) | |
tree | 29745b20ae4113d657bc9acf9f554389d462c483 | |
parent | 187c3ed1b10b82f8dd36c5aee43cc4551dc97e3f (diff) | |
download | ydb-3557093c22e5182620d0f182d0172599ba29771e.tar.gz |
Outgoing handshake using readonly lease KIKIMR-18893
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 18 |
3 files changed, 42 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 534de7cbea..08f75bc61b 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -2,6 +2,7 @@ #include "change_exchange_impl.h" #include "change_sender_common_ops.h" #include "change_sender_monitoring.h" +#include "datashard_impl.h" #include <ydb/core/base/tablet_pipecache.h> #include <ydb/library/services/services.pb.h> @@ -58,22 +59,34 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS /// Handshake void Handshake() { - auto ev = MakeHolder<TEvChangeExchange::TEvHandshake>(); - ev->Record.SetOrigin(DataShard.TabletId); - ev->Record.SetGeneration(DataShard.Generation); - - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.Release(), ShardId, true)); + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie); Become(&TThis::StateHandshake); } STATEFN(StateHandshake) { switch (ev->GetTypeRewrite()) { + hFunc(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation, Handle); hFunc(TEvChangeExchange::TEvStatus, Handshake); default: return StateBase(ev); } } + void Handle(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation::TPtr& ev) { + if (ev->Cookie != LeaseConfirmationCookie) { + LOG_W("Readonly lease confirmation cookie mismatch" + << ": expected# " << LeaseConfirmationCookie + << ", got# " << ev->Cookie); + return; + } + + auto handshake = MakeHolder<TEvChangeExchange::TEvHandshake>(); + handshake->Record.SetOrigin(DataShard.TabletId); + handshake->Record.SetGeneration(DataShard.Generation); + + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true)); + } + void Handshake(TEvChangeExchange::TEvStatus::TPtr& ev) { LOG_D("Handshake " << ev->Get()->ToString()); @@ -266,6 +279,7 @@ public: , ShardId(shardId) , IndexTablePathId(indexTablePathId) , TagMap(tagMap) + , LeaseConfirmationCookie(0) , LastRecordOrder(0) { } @@ -292,6 +306,7 @@ private: mutable TMaybe<TString> LogPrefix; TActorId LeaderPipeCache; + ui64 LeaseConfirmationCookie; ui64 LastRecordOrder; // Retry on delivery problem diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index dbe1709fd8..86f2a1ef72 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2224,6 +2224,10 @@ void TDataShard::SendWithConfirmedReadOnlyLease( SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId); } +void TDataShard::Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext&) { + SendWithConfirmedReadOnlyLease(ev->Get()->Timestamp, ev->Sender, new TEvPrivate::TEvReadonlyLeaseConfirmation, ev->Cookie); +} + void TDataShard::SendImmediateReadResult( TMonotonic readTime, const TActorId& target, diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 95bb9e1a98..e84589260e 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -301,6 +301,8 @@ class TDataShard friend class TTxStartMvccStateChange; friend class TTxExecuteMvccStateChange; + friend class TAsyncIndexChangeSenderShard; + class TTxPersistSubDomainPathId; class TTxPersistSubDomainOutOfSpace; @@ -347,6 +349,8 @@ class TDataShard EvCdcStreamScanContinue, EvRestartOperation, // used to restart after an aborted scan (e.g. backup) EvChangeExchangeExecuteHandshakes, + EvConfirmReadonlyLease, + EvReadonlyLeaseConfirmation, EvEnd }; @@ -529,6 +533,17 @@ class TDataShard }; struct TEvChangeExchangeExecuteHandshakes : public TEventLocal<TEvChangeExchangeExecuteHandshakes, EvChangeExchangeExecuteHandshakes> {}; + + struct TEvConfirmReadonlyLease : public TEventLocal<TEvConfirmReadonlyLease, EvConfirmReadonlyLease> { + explicit TEvConfirmReadonlyLease(TMonotonic ts = TMonotonic::Zero()) + : Timestamp(ts) + { + } + + const TMonotonic Timestamp; + }; + + struct TEvReadonlyLeaseConfirmation: public TEventLocal<TEvReadonlyLeaseConfirmation, EvReadonlyLeaseConfirmation> {}; }; struct Schema : NIceDb::Schema { @@ -1303,6 +1318,8 @@ class TDataShard void Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -2922,6 +2939,7 @@ protected: HFunc(TEvLongTxService::TEvLockStatus, Handle); HFunc(TEvDataShard::TEvGetOpenTxs, Handle); HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); + HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, |