aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-14 23:19:56 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-14 23:19:56 +0300
commitb9e465d0b276d243a328b00aa44293ab8b6d7bf1 (patch)
tree358fd308d2adbc21460a76c6f4c185b8bc52239b
parentf1015a245cd81e95cf35cfa4f50b3b89f12426ec (diff)
downloadydb-b9e465d0b276d243a328b00aa44293ab8b6d7bf1.tar.gz
Re-enqueue records (if any) at CreateSenders() (after resolve) KIKIMR-14359
ref:aea8d351880e5b2418759af320f6d84892efd204
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp16
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp11
3 files changed, 23 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index f7d4456da4..ee8576248b 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -21,12 +21,18 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
}
for (const auto& [_, sender] : Senders) {
+ if (sender.Pending) {
+ Enqueued.insert(sender.Pending.begin(), sender.Pending.end());
+ }
+
ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
}
Senders = std::move(senders);
- SendRecords();
+ if (!Enqueued || !RequestRecords()) {
+ SendRecords();
+ }
}
void TBaseChangeSender::KillSenders() {
@@ -48,9 +54,9 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
RequestRecords();
}
-void TBaseChangeSender::RequestRecords() {
+bool TBaseChangeSender::RequestRecords() {
if (!Enqueued) {
- return;
+ return false;
}
auto it = Enqueued.begin();
@@ -69,10 +75,11 @@ void TBaseChangeSender::RequestRecords() {
}
if (!records) {
- return;
+ return false;
}
ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
+ return true;
}
void TBaseChangeSender::ProcessRecords(TVector<TChangeRecord>&& records) {
@@ -194,7 +201,6 @@ void TBaseChangeSender::OnGone(ui64 partitionId) {
const auto& sender = it->second;
if (sender.Pending) {
Enqueued.insert(sender.Pending.begin(), sender.Pending.end());
- RequestRecords();
}
Senders.erase(it);
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 2ba88be3f2..e807a468aa 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -88,7 +88,7 @@ class TBaseChangeSender: public IChangeSender {
TVector<TEnqueuedRecord> Pending;
};
- void RequestRecords();
+ bool RequestRecords();
void SendRecords();
protected:
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 5e312f673c..3af5b4a2e5 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -363,6 +363,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
bool preventEnqueueing = true;
TVector<THolder<IEventHandle>> enqueued;
THashMap<ui64, ui32> splitAcks;
+ ui32 allowedRejects = Max<ui32>();
runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
@@ -374,6 +375,15 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
return TTestActorRuntime::EEventAction::PROCESS;
}
+ case TEvChangeExchange::EvStatus:
+ if (ev->Get<TEvChangeExchange::TEvStatus>()->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_REJECT) {
+ if (!allowedRejects) {
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ --allowedRejects;
+ }
+ break;
+
case TEvDataShard::EvSplitAck:
++splitAcks[ev->Get<TEvDataShard::TEvSplitAck>()->Record.GetOperationCookie()];
break;
@@ -468,6 +478,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
// merge of index table
preventEnqueueing = true;
+ allowedRejects = 1; // skip 2nd reject from index shard
ExecSQL(server, sender, R"(
UPSERT INTO `/Root/Table` (pkey, ikey) VALUES
(1, 15),