aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-09-12 21:55:43 +0300
committerilnaz <ilnaz@ydb.tech>2023-09-12 22:13:23 +0300
commit3557093c22e5182620d0f182d0172599ba29771e (patch)
tree29745b20ae4113d657bc9acf9f554389d462c483
parent187c3ed1b10b82f8dd36c5aee43cc4551dc97e3f (diff)
downloadydb-3557093c22e5182620d0f182d0172599ba29771e.tar.gz
Outgoing handshake using readonly lease KIKIMR-18893
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h18
3 files changed, 42 insertions, 5 deletions
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 534de7cbea..08f75bc61b 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -2,6 +2,7 @@
#include "change_exchange_impl.h"
#include "change_sender_common_ops.h"
#include "change_sender_monitoring.h"
+#include "datashard_impl.h"
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/services/services.pb.h>
@@ -58,22 +59,34 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
/// Handshake
void Handshake() {
- auto ev = MakeHolder<TEvChangeExchange::TEvHandshake>();
- ev->Record.SetOrigin(DataShard.TabletId);
- ev->Record.SetGeneration(DataShard.Generation);
-
- Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.Release(), ShardId, true));
+ Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie);
Become(&TThis::StateHandshake);
}
STATEFN(StateHandshake) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation, Handle);
hFunc(TEvChangeExchange::TEvStatus, Handshake);
default:
return StateBase(ev);
}
}
+ void Handle(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation::TPtr& ev) {
+ if (ev->Cookie != LeaseConfirmationCookie) {
+ LOG_W("Readonly lease confirmation cookie mismatch"
+ << ": expected# " << LeaseConfirmationCookie
+ << ", got# " << ev->Cookie);
+ return;
+ }
+
+ auto handshake = MakeHolder<TEvChangeExchange::TEvHandshake>();
+ handshake->Record.SetOrigin(DataShard.TabletId);
+ handshake->Record.SetGeneration(DataShard.Generation);
+
+ Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true));
+ }
+
void Handshake(TEvChangeExchange::TEvStatus::TPtr& ev) {
LOG_D("Handshake " << ev->Get()->ToString());
@@ -266,6 +279,7 @@ public:
, ShardId(shardId)
, IndexTablePathId(indexTablePathId)
, TagMap(tagMap)
+ , LeaseConfirmationCookie(0)
, LastRecordOrder(0)
{
}
@@ -292,6 +306,7 @@ private:
mutable TMaybe<TString> LogPrefix;
TActorId LeaderPipeCache;
+ ui64 LeaseConfirmationCookie;
ui64 LastRecordOrder;
// Retry on delivery problem
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index dbe1709fd8..86f2a1ef72 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2224,6 +2224,10 @@ void TDataShard::SendWithConfirmedReadOnlyLease(
SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId);
}
+void TDataShard::Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext&) {
+ SendWithConfirmedReadOnlyLease(ev->Get()->Timestamp, ev->Sender, new TEvPrivate::TEvReadonlyLeaseConfirmation, ev->Cookie);
+}
+
void TDataShard::SendImmediateReadResult(
TMonotonic readTime,
const TActorId& target,
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 95bb9e1a98..e84589260e 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -301,6 +301,8 @@ class TDataShard
friend class TTxStartMvccStateChange;
friend class TTxExecuteMvccStateChange;
+ friend class TAsyncIndexChangeSenderShard;
+
class TTxPersistSubDomainPathId;
class TTxPersistSubDomainOutOfSpace;
@@ -347,6 +349,8 @@ class TDataShard
EvCdcStreamScanContinue,
EvRestartOperation, // used to restart after an aborted scan (e.g. backup)
EvChangeExchangeExecuteHandshakes,
+ EvConfirmReadonlyLease,
+ EvReadonlyLeaseConfirmation,
EvEnd
};
@@ -529,6 +533,17 @@ class TDataShard
};
struct TEvChangeExchangeExecuteHandshakes : public TEventLocal<TEvChangeExchangeExecuteHandshakes, EvChangeExchangeExecuteHandshakes> {};
+
+ struct TEvConfirmReadonlyLease : public TEventLocal<TEvConfirmReadonlyLease, EvConfirmReadonlyLease> {
+ explicit TEvConfirmReadonlyLease(TMonotonic ts = TMonotonic::Zero())
+ : Timestamp(ts)
+ {
+ }
+
+ const TMonotonic Timestamp;
+ };
+
+ struct TEvReadonlyLeaseConfirmation: public TEventLocal<TEvReadonlyLeaseConfirmation, EvReadonlyLeaseConfirmation> {};
};
struct Schema : NIceDb::Schema {
@@ -1303,6 +1318,8 @@ class TDataShard
void Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext& ctx);
+
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
void DoPeriodicTasks(const TActorContext &ctx);
@@ -2922,6 +2939,7 @@ protected:
HFunc(TEvLongTxService::TEvLockStatus, Handle);
HFunc(TEvDataShard::TEvGetOpenTxs, Handle);
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
+ HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD,