aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-23 21:19:17 +0300
committersnaury <snaury@ydb.tech>2023-03-23 21:19:17 +0300
commite02bdc34e84447047452610391d45f158f29ff03 (patch)
treef5bd9168e8cf3d0a4e72b4a69d43cd0158bd5bb0
parent2e726254aab602891f317f127c1b9330cbb3823b (diff)
downloadydb-e02bdc34e84447047452610391d45f158f29ff03.tar.gz
Don't ignore cancellations for reads that are stuck in queues
-rw-r--r--ydb/core/tx/datashard/datashard.h3
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp45
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp35
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h6
-rw-r--r--ydb/core/tx/datashard/read_iterator.h2
5 files changed, 76 insertions, 15 deletions
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 5b3a9fb36a..317be9f233 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -904,6 +904,9 @@ struct TEvDataShard {
// In current kqp impl ranges are already in TSerializedTableRange
// format, thus same format here
TVector<TSerializedTableRange> Ranges;
+
+ // True when TEvRead is cancelled while enqueued in a waiting queue
+ bool Cancelled = false;
};
struct TEvReadResult : public TEventPB<TEvReadResult,
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index ddfde1e0bc..9d58354e9f 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -2094,14 +2094,8 @@ public:
};
void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
- if (MediatorStateWaiting) {
- MediatorStateWaitingMsgs.emplace_back(ev.Release());
- UpdateProposeQueueSize();
- return;
- }
-
// note that ins some cases we mutate this request below
- const auto* request = ev->Get();
+ auto* request = ev->Get();
const auto& record = request->Record;
if (Y_UNLIKELY(!record.HasReadId())) {
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
@@ -2111,6 +2105,10 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
}
TReadIteratorId readId(ev->Sender, record.GetReadId());
+ if (!Pipeline.HandleWaitingReadIterator(readId, request)) {
+ // This request has been cancelled
+ return;
+ }
auto replyWithError = [&] (auto code, const auto& msg) {
std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
@@ -2122,6 +2120,27 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
ctx.Send(ev->Sender, result.release());
};
+ if (Y_UNLIKELY(Pipeline.HasWaitingReadIterator(readId) || ReadIterators.contains(readId))) {
+ replyWithError(
+ Ydb::StatusIds::ALREADY_EXISTS,
+ TStringBuilder() << "Request " << readId.ReadId << " already executing");
+ return;
+ }
+
+ if (!IsStateActive()) {
+ replyWithError(
+ Ydb::StatusIds::OVERLOADED,
+ TStringBuilder() << "Shard " << TabletID() << " is splitting/merging");
+ return;
+ }
+
+ if (MediatorStateWaiting) {
+ Pipeline.RegisterWaitingReadIterator(readId, request);
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
if (Pipeline.HasDrop()) {
replyWithError(
Ydb::StatusIds::INTERNAL_ERROR,
@@ -2139,13 +2158,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
- if (Y_UNLIKELY(ReadIterators.contains(readId))) {
- replyWithError(
- Ydb::StatusIds::ALREADY_EXISTS,
- TStringBuilder() << "Request " << readId.ReadId << " already executing");
- return;
- }
-
if (!request->Keys.empty() && !request->Ranges.empty()) {
replyWithError(Ydb::StatusIds::BAD_REQUEST, "Both keys and ranges are forbidden");
return;
@@ -2409,6 +2421,11 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadCancel: " << record);
TReadIteratorId readId(ev->Sender, record.GetReadId());
+ if (Pipeline.CancelWaitingReadIterator(readId)) {
+ Y_VERIFY(!ReadIterators.contains(readId));
+ return;
+ }
+
auto it = ReadIterators.find(readId);
if (it == ReadIterators.end())
return;
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index f8e8e54321..692de517fd 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1764,6 +1764,9 @@ void TPipeline::AddWaitingReadIterator(
TEvDataShard::TEvRead::TPtr ev,
const TActorContext& ctx)
{
+ // Combined with registration for convenience
+ RegisterWaitingReadIterator(TReadIteratorId(ev->Sender, ev->Get()->Record.GetReadId()), ev->Get());
+
if (Y_UNLIKELY(Self->MvccSwitchState == TSwitchState::SWITCHING)) {
// postpone tx processing till mvcc state switch is finished
WaitingDataReadIterators.emplace(TRowVersion::Min(), ev);
@@ -1787,6 +1790,38 @@ void TPipeline::AddWaitingReadIterator(
<< ", current unreliable edge# " << unreadableEdge);
}
+bool TPipeline::HasWaitingReadIterator(const TReadIteratorId& readId) {
+ return WaitingReadIteratorsById.contains(readId);
+}
+
+bool TPipeline::CancelWaitingReadIterator(const TReadIteratorId& readId) {
+ auto it = WaitingReadIteratorsById.find(readId);
+ if (it != WaitingReadIteratorsById.end()) {
+ if (!it->second->Cancelled) {
+ it->second->Cancelled = true;
+ }
+ return true;
+ }
+
+ return false;
+}
+
+void TPipeline::RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) {
+ auto res = WaitingReadIteratorsById.emplace(readId, event);
+ Y_VERIFY(res.second);
+}
+
+bool TPipeline::HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event) {
+ auto it = WaitingReadIteratorsById.find(readId);
+ if (it != WaitingReadIteratorsById.end() && it->second == event) {
+ bool ok = !it->second->Cancelled;
+ WaitingReadIteratorsById.erase(it);
+ return ok;
+ }
+
+ return true;
+}
+
TRowVersion TPipeline::GetReadEdge() const {
if (Self->TransQueue.PlannedTxs) {
for (auto order : Self->TransQueue.PlannedTxs) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index fcfc94059c..a57aaf6bde 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -6,6 +6,7 @@
#include "datashard_dep_tracker.h"
#include "datashard_user_table.h"
#include "execution_unit.h"
+#include "read_iterator.h"
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -339,6 +340,10 @@ public:
const TRowVersion& version,
TEvDataShard::TEvRead::TPtr ev,
const TActorContext& ctx);
+ bool HasWaitingReadIterator(const TReadIteratorId& readId);
+ bool CancelWaitingReadIterator(const TReadIteratorId& readId);
+ void RegisterWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event);
+ bool HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event);
TRowVersion GetReadEdge() const;
TRowVersion GetUnreadableEdge(bool prioritizedReads) const;
@@ -492,6 +497,7 @@ private:
THashMap<ui64, TOperation::TPtr> CompletingOps;
TMultiMap<TRowVersion, TEvDataShard::TEvRead::TPtr> WaitingDataReadIterators;
+ THashMap<TReadIteratorId, TEvDataShard::TEvRead*, TReadIteratorId::THash> WaitingReadIteratorsById;
bool GetPlannedTx(NIceDb::TNiceDb& db, ui64& step, ui64& txId);
void SaveLastPlannedTx(NIceDb::TNiceDb& db, TStepOrder stepTxId);
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 478753baa4..c2fae3e0cd 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -17,7 +17,7 @@ namespace NKikimr::NDataShard {
struct TReadIteratorId {
TActorId Sender;
- ui64 ReadId = 0;
+ ui64 ReadId;
TReadIteratorId(const TActorId& sender, ui64 readId)
: Sender(sender)