aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-08-28 13:10:53 +0300
committerilnaz <ilnaz@ydb.tech>2023-08-28 13:27:46 +0300
commit868488ffcc2889488f3af89f740e8abded1b8748 (patch)
tree4fe601f8d072e5b9073f8afcfd731f7a76706b26
parentda025ccc23fafcc3a3a572d83f82996be433fa89 (diff)
downloadydb-868488ffcc2889488f3af89f740e8abded1b8748.tar.gz
Report persistent LastRecordOrder when shaking hands KIKIMR-18893
-rw-r--r--ydb/core/tx/datashard/datashard_change_receiving.cpp39
1 files changed, 15 insertions, 24 deletions
diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp
index af1e9d278d..6bc48aee03 100644
--- a/ydb/core/tx/datashard/datashard_change_receiving.cpp
+++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp
@@ -1,7 +1,6 @@
#include "datashard_impl.h"
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
using namespace NTabletFlatExecutor;
@@ -28,10 +27,7 @@ public:
break;
}
- auto it = Self->InChangeSenders.find(req.GetOrigin());
- if (it == Self->InChangeSenders.end()) {
- ok = ok && db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Precharge();
- }
+ ok = ok && db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Precharge();
}
return ok;
@@ -69,29 +65,25 @@ public:
return true;
}
- auto it = Self->InChangeSenders.find(req.GetOrigin());
- if (it == Self->InChangeSenders.end()) {
- auto rowset = db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Select();
- if (!rowset.IsReady()) {
- return false;
- }
+ auto rowset = db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
- if (rowset.IsValid()) {
- const auto generation = rowset.GetValue<Schema::ChangeSenders::Generation>();
- const auto lastRecordOrder = rowset.GetValueOrDefault<Schema::ChangeSenders::LastRecordOrder>(0);
- it = Self->InChangeSenders.emplace(req.GetOrigin(), TInChangeSender(generation, lastRecordOrder)).first;
- } else {
- it = Self->InChangeSenders.emplace(req.GetOrigin(), TInChangeSender(req.GetGeneration())).first;
- }
+ TInChangeSender info(req.GetGeneration());
+ if (rowset.IsValid()) {
+ info.Generation = rowset.GetValue<Schema::ChangeSenders::Generation>();
+ info.LastRecordOrder = rowset.GetValueOrDefault<Schema::ChangeSenders::LastRecordOrder>(0);
}
- if (it->second.Generation > req.GetGeneration()) {
+ auto it = Self->InChangeSenders.emplace(req.GetOrigin(), info).first;
+ if (it->second.Generation > req.GetGeneration()) { // use in-memory Generation
resp.SetStatus(NKikimrChangeExchange::TEvStatus::STATUS_REJECT);
resp.SetReason(NKikimrChangeExchange::TEvStatus::REASON_STALE_ORIGIN);
} else {
- it->second.Generation = req.GetGeneration();
+ it->second.Generation = req.GetGeneration(); // update in-memory Generation
resp.SetStatus(NKikimrChangeExchange::TEvStatus::STATUS_OK);
- resp.SetLastRecordOrder(it->second.LastRecordOrder);
+ resp.SetLastRecordOrder(info.LastRecordOrder); // use persistent LastRecordOrder
}
return true;
@@ -444,5 +436,4 @@ void TDataShard::Handle(TEvChangeExchange::TEvApplyRecords::TPtr& ev, const TAct
Execute(new TTxApplyChangeRecords(this, ev), ctx);
}
-} // NDataShard
-} // NKikimr
+}