aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-28 17:00:51 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-28 17:00:51 +0300
commit429e4a530104658268d005e9d75f32e5537d3668 (patch)
tree9438ac250af41fe59722418936e97e15c264e8ef
parentcca97a5c32063d999553902f29fdac1f520ddd60 (diff)
downloadydb-429e4a530104658268d005e9d75f32e5537d3668.tar.gz
rework read iterator test for node disconnection
-rw-r--r--ydb/core/tx/datashard/datashard.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp112
2 files changed, 110 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a509ac938e..69b124f648 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2930,12 +2930,22 @@ void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64
void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev); Y_UNUSED(ctx);
- LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Server connected at tablet %s %" PRIu64 ,
- Executor()->GetStats().IsFollower ? "follower" : "leader", ev->Get()->TabletId);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server connected at "
+ << (Executor()->GetStats().IsFollower ? "follower " : "leader ")
+ << "tablet# " << ev->Get()->TabletId
+ << ", clientId# " << ev->Get()->ClientId
+ << ", serverId# " << ev->Get()->ServerId
+ << ", sessionId# " << ev->InterconnectSession);
}
void TDataShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev); Y_UNUSED(ctx);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server disconnected at "
+ << (Executor()->GetStats().IsFollower ? "follower " : "leader ")
+ << "tablet# " << ev->Get()->TabletId
+ << ", clientId# " << ev->Get()->ClientId
+ << ", serverId# " << ev->Get()->ServerId
+ << ", sessionId# " << ev->InterconnectSession);
}
void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index d1cfddce1f..208e339307 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -762,6 +762,49 @@ struct TTestHelper {
return config;
}
+ NKikimrTabletBase::TEvGetCountersResponse GetCounters(
+ const TString& tableName,
+ ui32 node = 0,
+ TActorId sender = {})
+ {
+ if (!sender) {
+ sender = Sender;
+ }
+
+ const auto& table = Tables[tableName];
+ auto &runtime = *Server->GetRuntime();
+ runtime.SendToPipe(
+ table.TabletId,
+ sender,
+ new TEvTablet::TEvGetCounters,
+ node,
+ GetTestPipeConfig(),
+ table.ClientId);
+
+ auto ev = runtime.GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
+
+ UNIT_ASSERT(ev);
+ return ev->Get()->Record;
+ }
+
+ ui64 GetSimpleCounter(
+ const TString& tableName,
+ const TString& name,
+ ui32 node = 0)
+ {
+ const auto counters = GetCounters(tableName, node);
+ for (const auto& counter : counters.GetTabletCounters().GetAppCounters().GetSimpleCounters()) {
+ if (name != counter.GetName()) {
+ continue;
+ }
+
+ return counter.GetValue();
+ }
+
+ UNIT_ASSERT_C(false, "Counter not found: " << name);
+ return 0; // unreachable
+ }
+
public:
bool WithFollower = false;
ui64 ShardCount = 1;
@@ -1908,36 +1951,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
}
- Y_UNIT_TEST(ShouldStopWhenDisconnected) {
+ Y_UNIT_TEST(ShouldStopWhenNodeDisconnected) {
+ const ui32 nodeCount = 2;
+
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
- .SetNodeCount(20);
+ .SetNodeCount(nodeCount);
- const ui32 node = 13;
TTestHelper helper(serverSettings);
+ auto &runtime = *helper.Server->GetRuntime();
+
+ ui32 node = 0;
ui32 continueCounter = 0;
- helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ bool connectedFromDifferentNode = false;
+ ui32 serverConnectedCount = 0;
+ runtime.SetObserverFunc([&continueCounter, &connectedFromDifferentNode, &serverConnectedCount](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvReadContinue:
++continueCounter;
+ break;
+ case TEvTabletPipe::EvServerConnected: {
+ auto* typedEvent = dynamic_cast<TEvTabletPipe::TEvServerConnected*>(ev->GetBase());
+ ++serverConnectedCount;
+ if (typedEvent->ClientId.NodeId() != typedEvent->ServerId.NodeId()) {
+ connectedFromDifferentNode = true;
+ }
+ break;
+ }
}
return TTestActorRuntime::EEventAction::PROCESS;
});
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ helper.Server->GetRuntime()->DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
auto& table = helper.Tables["table-1"];
- auto prevClient = table.ClientId;
- auto &runtime = *helper.Server->GetRuntime();
auto sender = runtime.AllocateEdgeActor(node);
// we need to connect from another node
table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries());
UNIT_ASSERT(table.ClientId);
+ waitFor([&]{ return serverConnectedCount != 0; }, "intercepted EvServerConnected");
+ if (!connectedFromDifferentNode) {
+ ++node;
+ table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries());
+ UNIT_ASSERT(table.ClientId);
+ }
+ UNIT_ASSERT(connectedFromDifferentNode);
+
auto request1 = helper.GetBaseReadRequest("table-1", 1);
AddKeyQuery(*request1, {3, 3, 3});
AddKeyQuery(*request1, {1, 1, 1});
@@ -1946,15 +2023,22 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender);
- runtime.DisconnectNodes(node, node + 1, false);
+ auto exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", node);
+ auto iteratorsCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsCount", node);
+ UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorsCount, 1UL);
- // restore our nodeId=0 client
- table.ClientId = prevClient;
- helper.SendReadAck("table-1", readResult1->Record, 3, 10000); // DS must ignore it
+ runtime.DisconnectNodes(0, 1);
+ table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries());
- auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10));
- UNIT_ASSERT(!readResult2);
- UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+ exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", 0);
+ while (exhaustedCount != 0) {
+ SimulateSleep(helper.Server, TDuration::Seconds(1));
+ exhaustedCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsExhaustedCount", 0);
+ }
+
+ iteratorsCount = helper.GetSimpleCounter("table-1", "DataShard/ReadIteratorsCount", node);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorsCount, 0UL);
}
Y_UNIT_TEST(ShouldReadFromHead) {