diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-08-28 13:10:53 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-08-28 13:27:46 +0300 |
commit | 868488ffcc2889488f3af89f740e8abded1b8748 (patch) | |
tree | 4fe601f8d072e5b9073f8afcfd731f7a76706b26 | |
parent | da025ccc23fafcc3a3a572d83f82996be433fa89 (diff) | |
download | ydb-868488ffcc2889488f3af89f740e8abded1b8748.tar.gz |
Report persistent LastRecordOrder when shaking hands KIKIMR-18893
-rw-r--r-- | ydb/core/tx/datashard/datashard_change_receiving.cpp | 39 |
1 files changed, 15 insertions, 24 deletions
diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index af1e9d278d..6bc48aee03 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -1,7 +1,6 @@ #include "datashard_impl.h" -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { using namespace NTabletFlatExecutor; @@ -28,10 +27,7 @@ public: break; } - auto it = Self->InChangeSenders.find(req.GetOrigin()); - if (it == Self->InChangeSenders.end()) { - ok = ok && db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Precharge(); - } + ok = ok && db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Precharge(); } return ok; @@ -69,29 +65,25 @@ public: return true; } - auto it = Self->InChangeSenders.find(req.GetOrigin()); - if (it == Self->InChangeSenders.end()) { - auto rowset = db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Select(); - if (!rowset.IsReady()) { - return false; - } + auto rowset = db.Table<Schema::ChangeSenders>().Key(req.GetOrigin()).Select(); + if (!rowset.IsReady()) { + return false; + } - if (rowset.IsValid()) { - const auto generation = rowset.GetValue<Schema::ChangeSenders::Generation>(); - const auto lastRecordOrder = rowset.GetValueOrDefault<Schema::ChangeSenders::LastRecordOrder>(0); - it = Self->InChangeSenders.emplace(req.GetOrigin(), TInChangeSender(generation, lastRecordOrder)).first; - } else { - it = Self->InChangeSenders.emplace(req.GetOrigin(), TInChangeSender(req.GetGeneration())).first; - } + TInChangeSender info(req.GetGeneration()); + if (rowset.IsValid()) { + info.Generation = rowset.GetValue<Schema::ChangeSenders::Generation>(); + info.LastRecordOrder = rowset.GetValueOrDefault<Schema::ChangeSenders::LastRecordOrder>(0); } - if (it->second.Generation > req.GetGeneration()) { + auto it = Self->InChangeSenders.emplace(req.GetOrigin(), info).first; + if (it->second.Generation > req.GetGeneration()) { // use in-memory Generation resp.SetStatus(NKikimrChangeExchange::TEvStatus::STATUS_REJECT); resp.SetReason(NKikimrChangeExchange::TEvStatus::REASON_STALE_ORIGIN); } else { - it->second.Generation = req.GetGeneration(); + it->second.Generation = req.GetGeneration(); // update in-memory Generation resp.SetStatus(NKikimrChangeExchange::TEvStatus::STATUS_OK); - resp.SetLastRecordOrder(it->second.LastRecordOrder); + resp.SetLastRecordOrder(info.LastRecordOrder); // use persistent LastRecordOrder } return true; @@ -444,5 +436,4 @@ void TDataShard::Handle(TEvChangeExchange::TEvApplyRecords::TPtr& ev, const TAct Execute(new TTxApplyChangeRecords(this, ev), ctx); } -} // NDataShard -} // NKikimr +} |