diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-28 17:00:51 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-28 17:00:51 +0300 |
commit | 429e4a530104658268d005e9d75f32e5537d3668 (patch) | |
tree | 9438ac250af41fe59722418936e97e15c264e8ef | |
parent | cca97a5c32063d999553902f29fdac1f520ddd60 (diff) | |
download | ydb-429e4a530104658268d005e9d75f32e5537d3668.tar.gz |
rework read iterator test for node disconnection
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 112 |
2 files changed, 110 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a509ac938e..69b124f648 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2930,12 +2930,22 @@ void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64 void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); - LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Server connected at tablet %s %" PRIu64 , - Executor()->GetStats().IsFollower ? "follower" : "leader", ev->Get()->TabletId); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server connected at " + << (Executor()->GetStats().IsFollower ? "follower " : "leader ") + << "tablet# " << ev->Get()->TabletId + << ", clientId# " << ev->Get()->ClientId + << ", serverId# " << ev->Get()->ServerId + << ", sessionId# " << ev->InterconnectSession); } void TDataShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server disconnected at " + << (Executor()->GetStats().IsFollower ? "follower " : "leader ") + << "tablet# " << ev->Get()->TabletId + << ", clientId# " << ev->Get()->ClientId + << ", serverId# " << ev->Get()->ServerId + << ", sessionId# " << ev->InterconnectSession); } void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index d1cfddce1f..208e339307 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -762,6 +762,49 @@ struct TTestHelper { return config; } + NKikimrTabletBase::TEvGetCountersResponse GetCounters( + const TString& tableName, + ui32 node = 0, + TActorId sender = {}) + { + if (!sender) { + sender = Sender; + } + + const auto& table = Tables[tableName]; + auto &runtime = *Server->GetRuntime(); + runtime.SendToPipe( + table.TabletId, + sender, + new TEvTablet::TEvGetCounters, + node, + GetTestPipeConfig(), + table.ClientId); + + auto ev = runtime.GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender); + + UNIT_ASSERT(ev); + return ev->Get()->Record; + } + + ui64 GetSimpleCounter( + const TString& tableName, + const TString& name, + ui32 node = 0) + { + const auto counters = GetCounters(tableName, node); + for (const auto& counter : counters.GetTabletCounters().GetAppCounters().GetSimpleCounters()) { + if (name != counter.GetName()) { + continue; + } + + return counter.GetValue(); + } + + UNIT_ASSERT_C(false, "Counter not found: " << name); + return 0; // unreachable + } + public: bool WithFollower = false; ui64 ShardCount = 1; @@ -1908,36 +1951,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); } - Y_UNIT_TEST(ShouldStopWhenDisconnected) { + Y_UNIT_TEST(ShouldStopWhenNodeDisconnected) { + const ui32 nodeCount = 2; + TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) - .SetNodeCount(20); + .SetNodeCount(nodeCount); - const ui32 node = 13; TTestHelper helper(serverSettings); + auto &runtime = *helper.Server->GetRuntime(); + + ui32 node = 0; ui32 continueCounter = 0; - helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) { + bool connectedFromDifferentNode = false; + ui32 serverConnectedCount = 0; + runtime.SetObserverFunc([&continueCounter, &connectedFromDifferentNode, &serverConnectedCount](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvReadContinue: ++continueCounter; + break; + case TEvTabletPipe::EvServerConnected: { + auto* typedEvent = dynamic_cast<TEvTabletPipe::TEvServerConnected*>(ev->GetBase()); + ++serverConnectedCount; + if (typedEvent->ClientId.NodeId() != typedEvent->ServerId.NodeId()) { + connectedFromDifferentNode = true; + } + break; + } } return TTestActorRuntime::EEventAction::PROCESS; }); + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + helper.Server->GetRuntime()->DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + auto& table = helper.Tables["table-1"]; - auto prevClient = table.ClientId; - auto &runtime = *helper.Server->GetRuntime(); auto sender = runtime.AllocateEdgeActor(node); // we need to connect from another node table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries()); UNIT_ASSERT(table.ClientId); + waitFor([&]{ return serverConnectedCount != 0; }, "intercepted EvServerConnected"); + if (!connectedFromDifferentNode) { + ++node; + table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries()); + UNIT_ASSERT(table.ClientId); + } + UNIT_ASSERT(connectedFromDifferentNode); + auto request1 = helper.GetBaseReadRequest("table-1", 1); AddKeyQuery(*request1, {3, 3, 3}); AddKeyQuery(*request1, {1, 1, 1}); @@ -1946,15 +2023,22 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender); - runtime.DisconnectNodes(node, node + 1, false); + auto exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", node); + auto iteratorsCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsCount", node); + UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, 1UL); + UNIT_ASSERT_VALUES_EQUAL(iteratorsCount, 1UL); - // restore our nodeId=0 client - table.ClientId = prevClient; - helper.SendReadAck("table-1", readResult1->Record, 3, 10000); // DS must ignore it + runtime.DisconnectNodes(0, 1); + table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries()); - auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10)); - UNIT_ASSERT(!readResult2); - UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0); + exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", 0); + while (exhaustedCount != 0) { + SimulateSleep(helper.Server, TDuration::Seconds(1)); + exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", 0); + } + + iteratorsCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsCount", node); + UNIT_ASSERT_VALUES_EQUAL(iteratorsCount, 0UL); } Y_UNIT_TEST(ShouldReadFromHead) { |