aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <i@eivanov.com>2022-06-24 21:04:47 +0300
committerEvgeniy Ivanov <i@eivanov.com>2022-06-24 21:04:47 +0300
commiteb9284fa44adfee719797e57009857cf33eafa7e (patch)
tree40418c17bd621ed08654d73d6bfd7b84062dd282
parentd8ac7868e8537b24b940bb70168bd7c2aa1bb2e7 (diff)
downloadydb-eb9284fa44adfee719797e57009857cf33eafa7e.tar.gz
KIKIMR-13003: use interconnect sessions and detect disconnect in ds iterator
ref:91310df067ac0749d81f4a173bc50553d863645b
-rw-r--r--ydb/core/protos/tx_datashard.proto8
-rw-r--r--ydb/core/tx/datashard/datashard.cpp26
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp105
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h11
-rw-r--r--ydb/core/tx/datashard/datashard_repl_offsets_server.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp86
-rw-r--r--ydb/core/tx/datashard/read_iterator.h10
7 files changed, 210 insertions, 44 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index dc3dffbbf23..84601ccb724 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1479,9 +1479,8 @@ message TEvGetCompactTableStatsResult {
// - SeqNo that should be used by user in TEvReadAck
// - ContinuationToken, which user can use to restart the read.
//
-// 5. On each TEvReadResult user sends TEvReadAck to update quota and continue reading.
-// * If user has already received multiple TEvReadResult messages, he is allowed to send
-// single TEvReadAck with SeqNo from the last result.
+// 5. Client must ack results, it is allowed to send single ack on multiple results
+// at once using SeqNo from the last result. Note that currently ack is rather quota update.
// * If user has received final TEvReadResult, he is allowed to not send TEvReadAck, though sending message is
// not a protocol violation.
// * If user has received TEvReadResult with error he should not send any reply.
@@ -1509,6 +1508,9 @@ message TEvGetCompactTableStatsResult {
//
// User must detect disconnect and after it happened ignore possible results from the shard.
// Instead new read must be started using ContinuationToken from last successful TEvReadResult.
+// Note that current DS implementation uses interconnect session from originating TEvRead message,
+// it sends all results via this session and detects disconnect based on the session. Though client
+// is allowed to send ACK via any sessions.
message TEvRead {
// User must always provide unique ReadId
// Note that shard distinguishes equal ReadId's
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index bdbae4d8ee2..9c7bc61c8ef 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2989,6 +2989,14 @@ void TDataShard::Handle(TEvents::TEvUndelivered::TPtr &ev,
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}
+
+ switch (ev->Get()->SourceType) {
+ case TEvents::TEvSubscribe::EventType:
+ ReadIteratorsOnNodeDisconnected(ev->Sender, ctx);
+ break;
+ default:
+ ;
+ }
}
void TDataShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev,
@@ -3001,6 +3009,8 @@ void TDataShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev,
Pipeline.ProcessDisconnected(nodeId);
PlanQueue.Progress(ctx);
+
+ ReadIteratorsOnNodeDisconnected(ev->Sender, ctx);
}
void TDataShard::Handle(TEvDataShard::TEvMigrateSchemeShardRequest::TPtr& ev,
@@ -3186,6 +3196,22 @@ void TDataShard::Handle(TEvDataShard::TEvGetRemovedRowVersions::TPtr& ev, const
Execute(new TTxGetRemovedRowVersions(this, std::move(ev)), ctx);
}
+void SendViaSession(const TActorId& sessionId,
+ const TActorId& target,
+ const TActorId& src,
+ IEventBase* event,
+ ui32 flags,
+ ui64 cookie)
+{
+ THolder<IEventHandle> ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie);
+
+ if (sessionId) {
+ ev->Rewrite(TEvInterconnect::EvForward, sessionId);
+ }
+
+ TActivationContext::Send(ev.Release());
+}
+
} // NDataShard
TString TEvDataShard::TEvRead::ToString() const {
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 32dcd969e61..dbcb24ca18e 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -663,15 +663,18 @@ public:
return;
}
+ Y_VERIFY(it->second);
+ auto& state = *it->second;
+
if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxRead::Execute() finished without Result, aborting");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
Result.reset(new TEvDataShard::TEvReadResult());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
- ctx.Send(Sender, Result.release());
+ SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
return;
}
@@ -679,14 +682,11 @@ public:
auto& record = Result->Record;
if (record.HasStatus()) {
record.SetReadId(readId.ReadId);
- if (it->second) {
- auto& state = *it->second;
- record.SetSeqNo(state.SeqNo + 1);
- }
- ctx.Send(Sender, Result.release());
+ record.SetSeqNo(state.SeqNo + 1);
+ SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxRead::Execute() finished with error, aborting");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
return;
}
@@ -694,7 +694,7 @@ public:
Y_ASSERT(BlockBuilder);
Reader->FillResult(*Result);
- ctx.Send(Sender, Result.release());
+ SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release());
// note that we save the state only when there're unread queries
if (Reader->HasUnreadQueries()) {
@@ -709,7 +709,7 @@ public:
} else {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " finished in read");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
}
}
@@ -1094,30 +1094,30 @@ public:
return;
}
+ Y_VERIFY(it->second);
+ auto& state = *it->second;
+
if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() finished without Result, aborting");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
Result.reset(new TEvDataShard::TEvReadResult());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
- ctx.Send(request->Reader, Result.release());
+ SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
return;
}
// error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
- if (it->second) {
- auto& state = *it->second;
- record.SetSeqNo(state.SeqNo + 1);
- }
+ record.SetSeqNo(state.SeqNo + 1);
record.SetReadId(readId.ReadId);
- ctx.Send(request->Reader, Result.release());
+ SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() finished with error, aborting");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
return;
}
@@ -1125,7 +1125,7 @@ public:
Y_ASSERT(BlockBuilder);
Reader->FillResult(*Result);
- ctx.Send(request->Reader, Result.release());
+ SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release());
if (Reader->HasUnreadQueries()) {
Y_ASSERT(it->second);
@@ -1142,7 +1142,7 @@ public:
} else {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " finished in ReadContinue");
- Self->ReadIterators.erase(it);
+ Self->DeleteReadIterator(it);
}
}
};
@@ -1168,7 +1168,25 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
- ReadIterators.emplace(readId, new TReadIteratorState());
+ TActorId sessionId;
+ if (readId.Sender.NodeId() != SelfId().NodeId()) {
+ Y_VERIFY_DEBUG(ev->InterconnectSession);
+ THashMap<TActorId, TReadIteratorSession>::insert_ctx sessionsInsertCtx;
+ auto itSession = ReadIteratorSessions.find(ev->InterconnectSession, sessionsInsertCtx);
+ if (itSession == ReadIteratorSessions.end()) {
+ Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
+ itSession = ReadIteratorSessions.emplace_direct(
+ sessionsInsertCtx,
+ ev->InterconnectSession,
+ TReadIteratorSession());
+ }
+
+ auto& session = itSession->second;
+ session.Iterators.insert(readId);
+ sessionId = ev->InterconnectSession;
+ }
+
+ ReadIterators.emplace(readId, new TReadIteratorState(sessionId));
Executor()->Execute(new TTxRead(this, ev), ctx);
}
@@ -1233,9 +1251,9 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr);
result->Record.SetReadId(readId.ReadId);
- ctx.Send(ev->Sender, result.release());
+ SendViaSession(state.SessionId, readId.Sender, SelfId(), result.release());
- ReadIterators.erase(it); // abort
+ DeleteReadIterator(it);
return;
}
@@ -1271,7 +1289,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record);
TReadIteratorId readId(ev->Sender, record.GetReadId());
- ReadIterators.erase(readId);
+ DeleteReadIterator(readId);
}
void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) {
@@ -1286,10 +1304,47 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr
result->Record.SetReadId(iterator.first.ReadId);
result->Record.SetSeqNo(state->SeqNo + 1);
- ctx.Send(readIteratorId.Sender, result.release());
+ SendViaSession(state->SessionId, readIteratorId.Sender, SelfId(), result.release());
}
ReadIterators.clear();
+ ReadIteratorSessions.clear();
+}
+
+void TDataShard::DeleteReadIterator(const TReadIteratorId& readId) {
+ auto it = ReadIterators.find(readId);
+ if (it != ReadIterators.end())
+ DeleteReadIterator(it);
+}
+
+void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
+ const auto& state = it->second;
+ if (state->SessionId) {
+ auto itSession = ReadIteratorSessions.find(state->SessionId);
+ if (itSession != ReadIteratorSessions.end()) {
+ auto& session = itSession->second;
+ session.Iterators.erase(it->first);
+ }
+ }
+ ReadIterators.erase(it);
+}
+
+void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx) {
+ auto itSession = ReadIteratorSessions.find(sessionId);
+ if (itSession == ReadIteratorSessions.end())
+ return;
+
+ const auto& session = itSession->second;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
+ << " closed session# " << sessionId << ", iterators# " << session.Iterators.size());
+
+ for (const auto& readId: session.Iterators) {
+ // we don't send anything to client, because it's up
+ // to client to detect disconnect
+ ReadIterators.erase(readId);
+ }
+
+ ReadIteratorSessions.erase(itSession);
}
} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2bae187d430..acc6d2063d4 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1531,7 +1531,10 @@ public:
bool CheckChangesQueueOverflow() const;
+ void DeleteReadIterator(const TReadIteratorId& readId);
+ void DeleteReadIterator(TReadIteratorsMap::iterator it);
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
+ void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx);
private:
///
@@ -2250,6 +2253,7 @@ private:
TReplicatedTableState* EnsureReplicatedTable(const TPathId& pathId);
TReadIteratorsMap ReadIterators;
+ THashMap<TActorId, TReadIteratorSession> ReadIteratorSessions;
protected:
// Redundant init state required by flat executor implementation
@@ -2606,4 +2610,11 @@ void SetStatusError(T &rec,
issue->set_message(msg);
}
+void SendViaSession(const TActorId& sessionId,
+ const TActorId& target,
+ const TActorId& src,
+ IEventBase* event,
+ ui32 flags = 0,
+ ui64 cookie = 0);
+
}}
diff --git a/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp b/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
index d4b948077a1..d0a5ca32d9b 100644
--- a/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
+++ b/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
@@ -248,13 +248,7 @@ void TReplicationSourceOffsetsServer::Handle(TEvDataShard::TEvReplicationSourceO
}
void TReplicationSourceOffsetsServer::SendViaSession(const TActorId& sessionId, const TActorId& target, IEventBase* event, ui32 flags, ui64 cookie) {
- THolder<IEventHandle> ev = MakeHolder<IEventHandle>(target, SelfId(), event, flags, cookie);
-
- if (sessionId) {
- ev->Rewrite(TEvInterconnect::EvForward, sessionId);
- }
-
- TActivationContext::Send(ev.Release());
+ NDataShard::SendViaSession(sessionId, target, SelfId(), event, flags, cookie);
}
void TReplicationSourceOffsetsServer::Handle(TEvents::TEvUndelivered::TPtr& ev) {
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 201fb090852..b536a74f09a 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -279,13 +279,20 @@ struct TTableInfo {
};
struct TTestHelper {
- TTestHelper(bool withFollower = false) {
+ explicit TTestHelper(bool withFollower = false) {
WithFollower = withFollower;
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
+ init(serverSettings);
+ }
+
+ explicit TTestHelper(const TServerSettings& serverSettings) {
+ init(serverSettings);
+ }
+ void init(const TServerSettings& serverSettings) {
Server = new TServer(serverSettings);
auto &runtime = *Server->GetRuntime();
@@ -429,15 +436,21 @@ struct TTestHelper {
std::unique_ptr<TEvDataShard::TEvReadResult> SendRead(
const TString& tableName,
- TEvDataShard::TEvRead* request)
+ TEvDataShard::TEvRead* request,
+ ui32 node = 0,
+ TActorId sender = {})
{
+ if (!sender) {
+ sender = Sender;
+ }
+
const auto& table = Tables[tableName];
auto &runtime = *Server->GetRuntime();
runtime.SendToPipe(
table.TabletId,
- Sender,
+ sender,
request,
- 0,
+ node,
GetTestPipeConfig(),
table.ClientId);
@@ -448,8 +461,14 @@ struct TTestHelper {
const TString& tableName,
const NKikimrTxDataShard::TEvReadResult& readResult,
ui64 rows,
- ui64 bytes)
+ ui64 bytes,
+ ui32 node = 0,
+ TActorId sender = {})
{
+ if (!sender) {
+ sender = Sender;
+ }
+
const auto& table = Tables[tableName];
auto* request = new TEvDataShard::TEvReadAck();
request->Record.SetReadId(readResult.GetReadId());
@@ -460,8 +479,9 @@ struct TTestHelper {
auto &runtime = *Server->GetRuntime();
runtime.SendToPipe(
table.TabletId,
- Sender, request,
- 0,
+ sender,
+ request,
+ node,
GetTestPipeConfig(),
table.ClientId);
}
@@ -1474,6 +1494,56 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
Y_UNIT_TEST(ShouldReadFromFollower) {
TestReadKey(NKikimrTxDataShard::CELLVEC, true);
}
+
+ Y_UNIT_TEST(ShouldStopWhenDisconnected) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(20);
+
+ const ui32 node = 13;
+
+ TTestHelper helper(serverSettings);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto& table = helper.Tables["table-1"];
+ auto prevClient = table.ClientId;
+
+ auto &runtime = *helper.Server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor(node);
+
+ // we need to connect from another node
+ table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries());
+ UNIT_ASSERT(table.ClientId);
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+
+ // set quota so that DS hangs waiting for ACK
+ request1->Record.SetMaxRows(1);
+
+ auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender);
+
+ runtime.DisconnectNodes(node, node + 1, false);
+
+ // restore our nodeId=0 client
+ table.ClientId = prevClient;
+ helper.SendReadAck("table-1", readResult1->Record, 3, 10000); // DS must ignore it
+
+ auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult2);
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
+ }
};
Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
@@ -1562,7 +1632,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
Y_UNIT_TEST(ShouldCalculateQuota) {
- NDataShard::TReadIteratorState state;
+ NDataShard::TReadIteratorState state({});
state.Quota.Rows = 100;
state.Quota.Bytes = 1000;
state.ConsumeSeqNo(10, 100); // seqno1
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 2033a6fd865..abda66c3263 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -38,6 +38,11 @@ struct TReadIteratorId {
}
};
+struct TReadIteratorSession {
+ TReadIteratorSession() = default;
+ THashSet<TReadIteratorId, TReadIteratorId::THash> Iterators;
+};
+
struct TReadIteratorState {
enum class EState {
Init,
@@ -58,7 +63,9 @@ struct TReadIteratorState {
};
public:
- TReadIteratorState() = default;
+ explicit TReadIteratorState(const TActorId& sessionId)
+ : SessionId(sessionId)
+ {}
bool IsExhausted() const { return State == EState::Exhausted; }
@@ -142,6 +149,7 @@ public:
TQuota Quota;
TVector<TQuota> ReadStats; // each index corresponds to SeqNo-1
+ TActorId SessionId;
ui64 SeqNo = 0;
ui64 LastAckSeqNo = 0;
ui32 FirstUnprocessedQuery = 0;