diff options
author | Evgeniy Ivanov <i@eivanov.com> | 2022-06-24 21:04:47 +0300 |
---|---|---|
committer | Evgeniy Ivanov <i@eivanov.com> | 2022-06-24 21:04:47 +0300 |
commit | eb9284fa44adfee719797e57009857cf33eafa7e (patch) | |
tree | 40418c17bd621ed08654d73d6bfd7b84062dd282 | |
parent | d8ac7868e8537b24b940bb70168bd7c2aa1bb2e7 (diff) | |
download | ydb-eb9284fa44adfee719797e57009857cf33eafa7e.tar.gz |
KIKIMR-13003: use interconnect sessions and detect disconnect in ds iterator
ref:91310df067ac0749d81f4a173bc50553d863645b
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 105 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_repl_offsets_server.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 86 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 10 |
7 files changed, 210 insertions, 44 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index dc3dffbbf23..84601ccb724 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1479,9 +1479,8 @@ message TEvGetCompactTableStatsResult { // - SeqNo that should be used by user in TEvReadAck // - ContinuationToken, which user can use to restart the read. // -// 5. On each TEvReadResult user sends TEvReadAck to update quota and continue reading. -// * If user has already received multiple TEvReadResult messages, he is allowed to send -// single TEvReadAck with SeqNo from the last result. +// 5. Client must ack results, it is allowed to send single ack on multiple results +// at once using SeqNo from the last result. Note that currently ack is rather quota update. // * If user has received final TEvReadResult, he is allowed to not send TEvReadAck, though sending message is // not a protocol violation. // * If user has received TEvReadResult with error he should not send any reply. @@ -1509,6 +1508,9 @@ message TEvGetCompactTableStatsResult { // // User must detect disconnect and after it happened ignore possible results from the shard. // Instead new read must be started using ContinuationToken from last successful TEvReadResult. +// Note that current DS implementation uses interconnect session from originating TEvRead message, +// it sends all results via this session and detects disconnect based on the session. Though client +// is allowed to send ACK via any sessions. message TEvRead { // User must always provide unique ReadId // Note that shard distinguishes equal ReadId's diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index bdbae4d8ee2..9c7bc61c8ef 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2989,6 +2989,14 @@ void TDataShard::Handle(TEvents::TEvUndelivered::TPtr &ev, Pipeline.AddCandidateOp(op); PlanQueue.Progress(ctx); } + + switch (ev->Get()->SourceType) { + case TEvents::TEvSubscribe::EventType: + ReadIteratorsOnNodeDisconnected(ev->Sender, ctx); + break; + default: + ; + } } void TDataShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev, @@ -3001,6 +3009,8 @@ void TDataShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev, Pipeline.ProcessDisconnected(nodeId); PlanQueue.Progress(ctx); + + ReadIteratorsOnNodeDisconnected(ev->Sender, ctx); } void TDataShard::Handle(TEvDataShard::TEvMigrateSchemeShardRequest::TPtr& ev, @@ -3186,6 +3196,22 @@ void TDataShard::Handle(TEvDataShard::TEvGetRemovedRowVersions::TPtr& ev, const Execute(new TTxGetRemovedRowVersions(this, std::move(ev)), ctx); } +void SendViaSession(const TActorId& sessionId, + const TActorId& target, + const TActorId& src, + IEventBase* event, + ui32 flags, + ui64 cookie) +{ + THolder<IEventHandle> ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie); + + if (sessionId) { + ev->Rewrite(TEvInterconnect::EvForward, sessionId); + } + + TActivationContext::Send(ev.Release()); +} + } // NDataShard TString TEvDataShard::TEvRead::ToString() const { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 32dcd969e61..dbcb24ca18e 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -663,15 +663,18 @@ public: return; } + Y_VERIFY(it->second); + auto& state = *it->second; + if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxRead::Execute() finished without Result, aborting"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); Result.reset(new TEvDataShard::TEvReadResult()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); - ctx.Send(Sender, Result.release()); + SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); return; } @@ -679,14 +682,11 @@ public: auto& record = Result->Record; if (record.HasStatus()) { record.SetReadId(readId.ReadId); - if (it->second) { - auto& state = *it->second; - record.SetSeqNo(state.SeqNo + 1); - } - ctx.Send(Sender, Result.release()); + record.SetSeqNo(state.SeqNo + 1); + SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxRead::Execute() finished with error, aborting"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); return; } @@ -694,7 +694,7 @@ public: Y_ASSERT(BlockBuilder); Reader->FillResult(*Result); - ctx.Send(Sender, Result.release()); + SendViaSession(state.SessionId, Sender, Self->SelfId(), Result.release()); // note that we save the state only when there're unread queries if (Reader->HasUnreadQueries()) { @@ -709,7 +709,7 @@ public: } else { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " finished in read"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); } } @@ -1094,30 +1094,30 @@ public: return; } + Y_VERIFY(it->second); + auto& state = *it->second; + if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished without Result, aborting"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); Result.reset(new TEvDataShard::TEvReadResult()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); - ctx.Send(request->Reader, Result.release()); + SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); return; } // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { - if (it->second) { - auto& state = *it->second; - record.SetSeqNo(state.SeqNo + 1); - } + record.SetSeqNo(state.SeqNo + 1); record.SetReadId(readId.ReadId); - ctx.Send(request->Reader, Result.release()); + SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished with error, aborting"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); return; } @@ -1125,7 +1125,7 @@ public: Y_ASSERT(BlockBuilder); Reader->FillResult(*Result); - ctx.Send(request->Reader, Result.release()); + SendViaSession(state.SessionId, request->Reader, Self->SelfId(), Result.release()); if (Reader->HasUnreadQueries()) { Y_ASSERT(it->second); @@ -1142,7 +1142,7 @@ public: } else { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " finished in ReadContinue"); - Self->ReadIterators.erase(it); + Self->DeleteReadIterator(it); } } }; @@ -1168,7 +1168,25 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } - ReadIterators.emplace(readId, new TReadIteratorState()); + TActorId sessionId; + if (readId.Sender.NodeId() != SelfId().NodeId()) { + Y_VERIFY_DEBUG(ev->InterconnectSession); + THashMap<TActorId, TReadIteratorSession>::insert_ctx sessionsInsertCtx; + auto itSession = ReadIteratorSessions.find(ev->InterconnectSession, sessionsInsertCtx); + if (itSession == ReadIteratorSessions.end()) { + Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery); + itSession = ReadIteratorSessions.emplace_direct( + sessionsInsertCtx, + ev->InterconnectSession, + TReadIteratorSession()); + } + + auto& session = itSession->second; + session.Iterators.insert(readId); + sessionId = ev->InterconnectSession; + } + + ReadIterators.emplace(readId, new TReadIteratorState(sessionId)); Executor()->Execute(new TTxRead(this, ev), ctx); } @@ -1233,9 +1251,9 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr); result->Record.SetReadId(readId.ReadId); - ctx.Send(ev->Sender, result.release()); + SendViaSession(state.SessionId, readId.Sender, SelfId(), result.release()); - ReadIterators.erase(it); // abort + DeleteReadIterator(it); return; } @@ -1271,7 +1289,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record); TReadIteratorId readId(ev->Sender, record.GetReadId()); - ReadIterators.erase(readId); + DeleteReadIterator(readId); } void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) { @@ -1286,10 +1304,47 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr result->Record.SetReadId(iterator.first.ReadId); result->Record.SetSeqNo(state->SeqNo + 1); - ctx.Send(readIteratorId.Sender, result.release()); + SendViaSession(state->SessionId, readIteratorId.Sender, SelfId(), result.release()); } ReadIterators.clear(); + ReadIteratorSessions.clear(); +} + +void TDataShard::DeleteReadIterator(const TReadIteratorId& readId) { + auto it = ReadIterators.find(readId); + if (it != ReadIterators.end()) + DeleteReadIterator(it); +} + +void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) { + const auto& state = it->second; + if (state->SessionId) { + auto itSession = ReadIteratorSessions.find(state->SessionId); + if (itSession != ReadIteratorSessions.end()) { + auto& session = itSession->second; + session.Iterators.erase(it->first); + } + } + ReadIterators.erase(it); +} + +void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx) { + auto itSession = ReadIteratorSessions.find(sessionId); + if (itSession == ReadIteratorSessions.end()) + return; + + const auto& session = itSession->second; + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() + << " closed session# " << sessionId << ", iterators# " << session.Iterators.size()); + + for (const auto& readId: session.Iterators) { + // we don't send anything to client, because it's up + // to client to detect disconnect + ReadIterators.erase(readId); + } + + ReadIteratorSessions.erase(itSession); } } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2bae187d430..acc6d2063d4 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1531,7 +1531,10 @@ public: bool CheckChangesQueueOverflow() const; + void DeleteReadIterator(const TReadIteratorId& readId); + void DeleteReadIterator(TReadIteratorsMap::iterator it); void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); + void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx); private: /// @@ -2250,6 +2253,7 @@ private: TReplicatedTableState* EnsureReplicatedTable(const TPathId& pathId); TReadIteratorsMap ReadIterators; + THashMap<TActorId, TReadIteratorSession> ReadIteratorSessions; protected: // Redundant init state required by flat executor implementation @@ -2606,4 +2610,11 @@ void SetStatusError(T &rec, issue->set_message(msg); } +void SendViaSession(const TActorId& sessionId, + const TActorId& target, + const TActorId& src, + IEventBase* event, + ui32 flags = 0, + ui64 cookie = 0); + }} diff --git a/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp b/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp index d4b948077a1..d0a5ca32d9b 100644 --- a/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp +++ b/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp @@ -248,13 +248,7 @@ void TReplicationSourceOffsetsServer::Handle(TEvDataShard::TEvReplicationSourceO } void TReplicationSourceOffsetsServer::SendViaSession(const TActorId& sessionId, const TActorId& target, IEventBase* event, ui32 flags, ui64 cookie) { - THolder<IEventHandle> ev = MakeHolder<IEventHandle>(target, SelfId(), event, flags, cookie); - - if (sessionId) { - ev->Rewrite(TEvInterconnect::EvForward, sessionId); - } - - TActivationContext::Send(ev.Release()); + NDataShard::SendViaSession(sessionId, target, SelfId(), event, flags, cookie); } void TReplicationSourceOffsetsServer::Handle(TEvents::TEvUndelivered::TPtr& ev) { diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 201fb090852..b536a74f09a 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -279,13 +279,20 @@ struct TTableInfo { }; struct TTestHelper { - TTestHelper(bool withFollower = false) { + explicit TTestHelper(bool withFollower = false) { WithFollower = withFollower; TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false); + init(serverSettings); + } + + explicit TTestHelper(const TServerSettings& serverSettings) { + init(serverSettings); + } + void init(const TServerSettings& serverSettings) { Server = new TServer(serverSettings); auto &runtime = *Server->GetRuntime(); @@ -429,15 +436,21 @@ struct TTestHelper { std::unique_ptr<TEvDataShard::TEvReadResult> SendRead( const TString& tableName, - TEvDataShard::TEvRead* request) + TEvDataShard::TEvRead* request, + ui32 node = 0, + TActorId sender = {}) { + if (!sender) { + sender = Sender; + } + const auto& table = Tables[tableName]; auto &runtime = *Server->GetRuntime(); runtime.SendToPipe( table.TabletId, - Sender, + sender, request, - 0, + node, GetTestPipeConfig(), table.ClientId); @@ -448,8 +461,14 @@ struct TTestHelper { const TString& tableName, const NKikimrTxDataShard::TEvReadResult& readResult, ui64 rows, - ui64 bytes) + ui64 bytes, + ui32 node = 0, + TActorId sender = {}) { + if (!sender) { + sender = Sender; + } + const auto& table = Tables[tableName]; auto* request = new TEvDataShard::TEvReadAck(); request->Record.SetReadId(readResult.GetReadId()); @@ -460,8 +479,9 @@ struct TTestHelper { auto &runtime = *Server->GetRuntime(); runtime.SendToPipe( table.TabletId, - Sender, request, - 0, + sender, + request, + node, GetTestPipeConfig(), table.ClientId); } @@ -1474,6 +1494,56 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { Y_UNIT_TEST(ShouldReadFromFollower) { TestReadKey(NKikimrTxDataShard::CELLVEC, true); } + + Y_UNIT_TEST(ShouldStopWhenDisconnected) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(20); + + const ui32 node = 13; + + TTestHelper helper(serverSettings); + + ui32 continueCounter = 0; + helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) { + ++continueCounter; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto& table = helper.Tables["table-1"]; + auto prevClient = table.ClientId; + + auto &runtime = *helper.Server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(node); + + // we need to connect from another node + table.ClientId = runtime.ConnectToPipe(table.TabletId, sender, node, GetPipeConfigWithRetries()); + UNIT_ASSERT(table.ClientId); + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + + // set quota so that DS hangs waiting for ACK + request1->Record.SetMaxRows(1); + + auto readResult1 = helper.SendRead("table-1", request1.release(), node, sender); + + runtime.DisconnectNodes(node, node + 1, false); + + // restore our nodeId=0 client + table.ClientId = prevClient; + helper.SendReadAck("table-1", readResult1->Record, 3, 10000); // DS must ignore it + + auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult2); + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0); + } }; Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { @@ -1562,7 +1632,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { - NDataShard::TReadIteratorState state; + NDataShard::TReadIteratorState state({}); state.Quota.Rows = 100; state.Quota.Bytes = 1000; state.ConsumeSeqNo(10, 100); // seqno1 diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 2033a6fd865..abda66c3263 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -38,6 +38,11 @@ struct TReadIteratorId { } }; +struct TReadIteratorSession { + TReadIteratorSession() = default; + THashSet<TReadIteratorId, TReadIteratorId::THash> Iterators; +}; + struct TReadIteratorState { enum class EState { Init, @@ -58,7 +63,9 @@ struct TReadIteratorState { }; public: - TReadIteratorState() = default; + explicit TReadIteratorState(const TActorId& sessionId) + : SessionId(sessionId) + {} bool IsExhausted() const { return State == EState::Exhausted; } @@ -142,6 +149,7 @@ public: TQuota Quota; TVector<TQuota> ReadStats; // each index corresponds to SeqNo-1 + TActorId SessionId; ui64 SeqNo = 0; ui64 LastAckSeqNo = 0; ui32 FirstUnprocessedQuery = 0; |