diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-14 23:19:56 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-14 23:19:56 +0300 |
commit | b9e465d0b276d243a328b00aa44293ab8b6d7bf1 (patch) | |
tree | 358fd308d2adbc21460a76c6f4c185b8bc52239b | |
parent | f1015a245cd81e95cf35cfa4f50b3b89f12426ec (diff) | |
download | ydb-b9e465d0b276d243a328b00aa44293ab8b6d7bf1.tar.gz |
Re-enqueue records (if any) at CreateSenders() (after resolve) KIKIMR-14359
ref:aea8d351880e5b2418759af320f6d84892efd204
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 11 |
3 files changed, 23 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index f7d4456da4..ee8576248b 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -21,12 +21,18 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { } for (const auto& [_, sender] : Senders) { + if (sender.Pending) { + Enqueued.insert(sender.Pending.begin(), sender.Pending.end()); + } + ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); } Senders = std::move(senders); - SendRecords(); + if (!Enqueued || !RequestRecords()) { + SendRecords(); + } } void TBaseChangeSender::KillSenders() { @@ -48,9 +54,9 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco RequestRecords(); } -void TBaseChangeSender::RequestRecords() { +bool TBaseChangeSender::RequestRecords() { if (!Enqueued) { - return; + return false; } auto it = Enqueued.begin(); @@ -69,10 +75,11 @@ void TBaseChangeSender::RequestRecords() { } if (!records) { - return; + return false; } ActorOps->Send(DataShard.ActorId, new TEvChangeExchange::TEvRequestRecords(std::move(records))); + return true; } void TBaseChangeSender::ProcessRecords(TVector<TChangeRecord>&& records) { @@ -194,7 +201,6 @@ void TBaseChangeSender::OnGone(ui64 partitionId) { const auto& sender = it->second; if (sender.Pending) { Enqueued.insert(sender.Pending.begin(), sender.Pending.end()); - RequestRecords(); } Senders.erase(it); diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 2ba88be3f2..e807a468aa 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -88,7 +88,7 @@ class TBaseChangeSender: public IChangeSender { TVector<TEnqueuedRecord> Pending; }; - void RequestRecords(); + bool RequestRecords(); void SendRecords(); protected: diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 5e312f673c..3af5b4a2e5 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -363,6 +363,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { bool preventEnqueueing = true; TVector<THolder<IEventHandle>> enqueued; THashMap<ui64, ui32> splitAcks; + ui32 allowedRejects = Max<ui32>(); runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { @@ -374,6 +375,15 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { return TTestActorRuntime::EEventAction::PROCESS; } + case TEvChangeExchange::EvStatus: + if (ev->Get<TEvChangeExchange::TEvStatus>()->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_REJECT) { + if (!allowedRejects) { + return TTestActorRuntime::EEventAction::DROP; + } + --allowedRejects; + } + break; + case TEvDataShard::EvSplitAck: ++splitAcks[ev->Get<TEvDataShard::TEvSplitAck>()->Record.GetOperationCookie()]; break; @@ -468,6 +478,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { // merge of index table preventEnqueueing = true; + allowedRejects = 1; // skip 2nd reject from index shard ExecSQL(server, sender, R"( UPSERT INTO `/Root/Table` (pkey, ikey) VALUES (1, 15), |