diff options
author | snaury <snaury@ydb.tech> | 2023-03-23 21:19:17 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-23 21:19:17 +0300 |
commit | e02bdc34e84447047452610391d45f158f29ff03 (patch) | |
tree | f5bd9168e8cf3d0a4e72b4a69d43cd0158bd5bb0 | |
parent | 2e726254aab602891f317f127c1b9330cbb3823b (diff) | |
download | ydb-e02bdc34e84447047452610391d45f158f29ff03.tar.gz |
Don't ignore cancellations for reads that are stuck in queues
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 45 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 2 |
5 files changed, 76 insertions, 15 deletions
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 5b3a9fb36a..317be9f233 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -904,6 +904,9 @@ struct TEvDataShard { // In current kqp impl ranges are already in TSerializedTableRange // format, thus same format here TVector<TSerializedTableRange> Ranges; + + // True when TEvRead is cancelled while enqueued in a waiting queue + bool Cancelled = false; }; struct TEvReadResult : public TEventPB<TEvReadResult, diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index ddfde1e0bc..9d58354e9f 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2094,14 +2094,8 @@ public: }; void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx) { - if (MediatorStateWaiting) { - MediatorStateWaitingMsgs.emplace_back(ev.Release()); - UpdateProposeQueueSize(); - return; - } - // note that ins some cases we mutate this request below - const auto* request = ev->Get(); + auto* request = ev->Get(); const auto& record = request->Record; if (Y_UNLIKELY(!record.HasReadId())) { std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); @@ -2111,6 +2105,10 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } TReadIteratorId readId(ev->Sender, record.GetReadId()); + if (!Pipeline.HandleWaitingReadIterator(readId, request)) { + // This request has been cancelled + return; + } auto replyWithError = [&] (auto code, const auto& msg) { std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); @@ -2122,6 +2120,27 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct ctx.Send(ev->Sender, result.release()); }; + if (Y_UNLIKELY(Pipeline.HasWaitingReadIterator(readId) || ReadIterators.contains(readId))) { + replyWithError( + Ydb::StatusIds::ALREADY_EXISTS, + TStringBuilder() << "Request " << readId.ReadId << " already executing"); + return; + } + + if (!IsStateActive()) { + replyWithError( + Ydb::StatusIds::OVERLOADED, + TStringBuilder() << "Shard " << TabletID() << " is splitting/merging"); + return; + } + + if (MediatorStateWaiting) { + Pipeline.RegisterWaitingReadIterator(readId, request); + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } + if (Pipeline.HasDrop()) { replyWithError( Ydb::StatusIds::INTERNAL_ERROR, @@ -2139,13 +2158,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } - if (Y_UNLIKELY(ReadIterators.contains(readId))) { - replyWithError( - Ydb::StatusIds::ALREADY_EXISTS, - TStringBuilder() << "Request " << readId.ReadId << " already executing"); - return; - } - if (!request->Keys.empty() && !request->Ranges.empty()) { replyWithError(Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden"); return; @@ -2409,6 +2421,11 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record); TReadIteratorId readId(ev->Sender, record.GetReadId()); + if (Pipeline.CancelWaitingReadIterator(readId)) { + Y_VERIFY(!ReadIterators.contains(readId)); + return; + } + auto it = ReadIterators.find(readId); if (it == ReadIterators.end()) return; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index f8e8e54321..692de517fd 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1764,6 +1764,9 @@ void TPipeline::AddWaitingReadIterator( TEvDataShard::TEvRead::TPtr ev, const TActorContext& ctx) { + // Combined with registration for convenience + RegisterWaitingReadIterator(TReadIteratorId(ev->Sender, ev->Get()->Record.GetReadId()), ev->Get()); + if (Y_UNLIKELY(Self->MvccSwitchState == TSwitchState::SWITCHING)) { // postpone tx processing till mvcc state switch is finished WaitingDataReadIterators.emplace(TRowVersion::Min(), ev); @@ -1787,6 +1790,38 @@ void TPipeline::AddWaitingReadIterator( << ", current unreliable edge# " << unreadableEdge); } +bool TPipeline::HasWaitingReadIterator(const TReadIteratorId& readId) { + return WaitingReadIteratorsById.contains(readId); +} + +bool TPipeline::CancelWaitingReadIterator(const TReadIteratorId& readId) { + auto it = WaitingReadIteratorsById.find(readId); + if (it != WaitingReadIteratorsById.end()) { + if (!it->second->Cancelled) { + it->second->Cancelled = true; + } + return true; + } + + return false; +} + +void TPipeline::RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) { + auto res = WaitingReadIteratorsById.emplace(readId, event); + Y_VERIFY(res.second); +} + +bool TPipeline::HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) { + auto it = WaitingReadIteratorsById.find(readId); + if (it != WaitingReadIteratorsById.end() && it->second == event) { + bool ok = !it->second->Cancelled; + WaitingReadIteratorsById.erase(it); + return ok; + } + + return true; +} + TRowVersion TPipeline::GetReadEdge() const { if (Self->TransQueue.PlannedTxs) { for (auto order : Self->TransQueue.PlannedTxs) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index fcfc94059c..a57aaf6bde 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -6,6 +6,7 @@ #include "datashard_dep_tracker.h" #include "datashard_user_table.h" #include "execution_unit.h" +#include "read_iterator.h" #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -339,6 +340,10 @@ public: const TRowVersion& version, TEvDataShard::TEvRead::TPtr ev, const TActorContext& ctx); + bool HasWaitingReadIterator(const TReadIteratorId& readId); + bool CancelWaitingReadIterator(const TReadIteratorId& readId); + void RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event); + bool HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event); TRowVersion GetReadEdge() const; TRowVersion GetUnreadableEdge(bool prioritizedReads) const; @@ -492,6 +497,7 @@ private: THashMap<ui64, TOperation::TPtr> CompletingOps; TMultiMap<TRowVersion, TEvDataShard::TEvRead::TPtr> WaitingDataReadIterators; + THashMap<TReadIteratorId, TEvDataShard::TEvRead*, TReadIteratorId::THash> WaitingReadIteratorsById; bool GetPlannedTx(NIceDb::TNiceDb& db, ui64& step, ui64& txId); void SaveLastPlannedTx(NIceDb::TNiceDb& db, TStepOrder stepTxId); diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 478753baa4..c2fae3e0cd 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -17,7 +17,7 @@ namespace NKikimr::NDataShard { struct TReadIteratorId { TActorId Sender; - ui64 ReadId = 0; + ui64 ReadId; TReadIteratorId(const TActorId& sender, ui64 readId) : Sender(sender) |