aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ildar-khisam@yandex-team.ru>2022-03-23 09:39:51 +0300
committerildar-khisam <ildar-khisam@yandex-team.ru>2022-03-23 09:39:51 +0300
commit12db7f172fd48ffead86b19bff5653a0aaffd0c8 (patch)
treee35df7c478329dff3132250d50f882780e1c1d80
parent52832ac4031554114ea18aeb87f8df1226c3e3e2 (diff)
downloadydb-12db7f172fd48ffead86b19bff5653a0aaffd0c8.tar.gz
Solve KIKIMR-14475
Solve KIKIMR-14475: send TabletActive event only after all partitions initialized, if any; add test for events order ref:e418d78a37443efa65f2ba227a45a5e3aadc95b5
-rw-r--r--ydb/core/persqueue/pq_impl.cpp11
-rw-r--r--ydb/core/persqueue/pq_impl.h2
-rw-r--r--ydb/core/persqueue/pq_ut.cpp46
3 files changed, 58 insertions, 1 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 7aecc34eecd..e3d68aea437 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -789,6 +789,9 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
}
HasDataRequests.clear();
+ if (Partitions.empty()) {
+ SignalTabletActive(ctx);
+ }
}
void TPersQueue::ReadState(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx)
@@ -980,11 +983,19 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
it->second.InitDone = true;
++PartitionsInited;
Y_VERIFY(ConfigInited);//partitions are inited only after config
+
+ if (PartitionsInited == Partitions.size()) {
+ SignalTabletActive(ctx);
+ }
+
if (NewConfigShouldBeApplied && PartitionsInited == Partitions.size()) {
ApplyNewConfigAndReply(ctx);
}
}
+void TPersQueue::DefaultSignalTabletActive(const TActorContext &ctx) {
+ Y_UNUSED(ctx);
+}
void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx)
{
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index bc3bfb10ba5..d87852edb50 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -46,6 +46,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
//when partition is ready it's sends event to tablet
void Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext&);
+ void DefaultSignalTabletActive(const TActorContext&) override;
+
//partitions will send some times it's counters
void Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorContext&);
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index fd621bc8ecf..07f5edf56a9 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -2058,6 +2058,50 @@ Y_UNIT_TEST(TestWriteTimeStampEstimate) {
}
+void CheckEventSequence(TTestContext& tc, std::function<void()> scenario, std::deque<ui32> expectedEvents) {
+ tc.Runtime->SetObserverFunc([&expectedEvents](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (!expectedEvents.empty() && ev->Type == expectedEvents.front()) {
+ expectedEvents.pop_front();
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&expectedEvents](){
+ return expectedEvents.empty();
+ };
+ options.FinalEvents.emplace_back(TEvPQ::EvEnd); // dummy event to prevent early return from DispatchEvents
+
+ scenario();
+
+ UNIT_ASSERT(tc.Runtime->DispatchEvents(options));
+ UNIT_ASSERT(expectedEvents.empty());
+}
+
+Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
+ TTestContext tc;
+ TFinalizer finalizer(tc);
+ tc.Prepare();
+
+ // Scenario 1: expect EvTabletActive after empty tablet reboot
+ CheckEventSequence(tc, /*scenario=*/[&tc]() {
+ ForwardToTablet(*tc.Runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill());
+ }, /*expectedEvents=*/{
+ TEvTablet::TEvRestored::EventType,
+ TEvTablet::TEvTabletActive::EventType,
+ });
+
+ // Scenario 2: expect EvTabletActive only after partitions init complete
+ CheckEventSequence(tc, /*scenario=*/[&tc]() {
+ PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{"aaa", true}}, tc, /*partitions=*/2);
+ ForwardToTablet(*tc.Runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill());
+ }, /*expectedEvents=*/{
+ TEvTablet::TEvRestored::EventType,
+ TEvPQ::TEvInitComplete::EventType,
+ TEvPQ::TEvInitComplete::EventType,
+ TEvTablet::TEvTabletActive::EventType,
+ });
+}
-} // TKeyValueTest
+} // TPQTest
} // NKikimr