aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <i@eivanov.com>2022-06-24 14:41:25 +0300
committerEvgeniy Ivanov <i@eivanov.com>2022-06-24 14:41:25 +0300
commit3b32688dae7af8ef3eef83bb4cd15dc6e7c41875 (patch)
tree02c82fb38d7c187870196e46c02d5810a9dd8c64
parent067fd14417000b3601483f660fe9e27c3b47f0b5 (diff)
downloadydb-3b32688dae7af8ef3eef83bb4cd15dc6e7c41875.tar.gz
KIKIMR-13003: DS iterator protocol cleanup and bug fixes
* Handle ACK reordering * Cancel iterators on shard split * Return proper status codes and don't reply on unknown ACKs ref:993845daed6c20157142bf575d714a79cdb23133
-rw-r--r--ydb/core/protos/tx_datashard.proto17
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp183
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp253
-rw-r--r--ydb/core/tx/datashard/read_iterator.h1
6 files changed, 392 insertions, 66 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index bf2b5400927..dc3dffbbf23 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1479,10 +1479,12 @@ message TEvGetCompactTableStatsResult {
// - SeqNo that should be used by user in TEvReadAck
// - ContinuationToken, which user can use to restart the read.
//
-// 5. On each TEvReadResult except the last one user sends TEvReadAck to update quota and continue reading.
-// * If user has already received multiple TEvReadResult messages, it is allowed to send
+// 5. On each TEvReadResult user sends TEvReadAck to update quota and continue reading.
+// * If user has already received multiple TEvReadResult messages, he is allowed to send
// single TEvReadAck with SeqNo from the last result.
-// * If user received final TEvReadResult or TEvReadResult with error he should not send any reply.
+// * If user has received final TEvReadResult, he is allowed to not send TEvReadAck, though sending message is
+// not a protocol violation.
+// * If user has received TEvReadResult with error he should not send any reply.
// Note that server will stop sending TEvReadResult messages only if last sent result contains either
// Finished or LimitReached field set. Otherwise until there is no disconnect, user can rely that
// he will receive more TEvReadResult messages. Though to improve latency it's a good practice
@@ -1492,8 +1494,13 @@ message TEvGetCompactTableStatsResult {
// or stops reading hisself using TEvReadCancel.
//
// TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS.
+// * Ydb::StatusIds::OVERLOADED means that shard has possibly been splitted, user must check schemeshard to find
+// proper shards.
+// * Ydb::StatusIds::SCHEME_ERROR means that either scheme version mismatch, wrong columns or table has been deleted.
+// * Ydb::StatusIds::BAD_REQUEST should not be retried, because it shows protocol violation.
// In case of any other status code iterator has been invalidated and further
-// usage of its ReadId will result in either non-success status code.
+// usage of its ReadId will result in either non-success status code. Also non-success result message might not
+// contain SeqNo.
//
// Shard is allowed to send TEvReadResult without actual results, but with LimitReached field set.
// E.g. it happens when initial TEvRead has too small quota to read at least single row.
@@ -1604,6 +1611,8 @@ message TEvReadResult {
// True when shard may have possibly returned more data, but
// stopped because it reached MaxRows or MaxBytes limits.
+ // Note that if data split between multiple results because of
+ // MaxRowsInResult, then this field is not set
optional bool LimitReached = 7;
optional NKikimrQueryStats.TReadOpStats Stats = 8;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index a87f5179e89..32dcd969e61 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -338,7 +338,7 @@ public:
bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) {
for (; FirstUnprocessedQuery < State.Request->Ranges.size(); ++FirstUnprocessedQuery) {
- if (OutOfQuota() || ShouldStopByElapsedTime())
+ if (ShouldStop())
return true;
const auto& range = State.Request->Ranges[FirstUnprocessedQuery];
@@ -358,7 +358,7 @@ public:
bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) {
for (; FirstUnprocessedQuery < State.Request->Keys.size(); ++FirstUnprocessedQuery) {
- if (OutOfQuota() || ShouldStopByElapsedTime())
+ if (ShouldStop())
return true;
const auto& key = State.Request->Keys[FirstUnprocessedQuery];
@@ -475,11 +475,18 @@ public:
private:
bool OutOfQuota() const {
return RowsRead >= State.Quota.Rows ||
- RowsRead >= State.MaxRowsInResult ||
BlockBuilder.Bytes() >= State.Quota.Bytes||
BytesInResult >= State.Quota.Bytes;
}
+ bool HasMaxRowsInResult() const {
+ return RowsRead >= State.MaxRowsInResult;
+ }
+
+ bool ShouldStop() {
+ return OutOfQuota() || HasMaxRowsInResult() || ShouldStopByElapsedTime();
+ }
+
bool Precharge(
NTable::TDatabase& db,
NTable::TRawVals keyFrom,
@@ -522,7 +529,7 @@ private:
++RowsRead;
RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats);
- if (OutOfQuota() || ShouldStopByElapsedTime()) {
+ if (ShouldStop()) {
return EReadStatus::StoppedByLimit;
}
}
@@ -583,10 +590,45 @@ public:
Result.reset(new TEvDataShard::TEvReadResult());
- if (Self->State != TShardState::Ready &&
- Self->State != TShardState::Readonly)
- {
- // TODO: do we need more state checks here?
+ switch (Self->State) {
+ case TShardState::Ready:
+ case TShardState::Readonly:
+ case TShardState::Frozen:
+ case TShardState::SplitSrcWaitForNoTxInFlight:
+ break;
+ case TShardState::Offline:
+ case TShardState::PreOffline: {
+ if (Self->SrcSplitDescription) {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::OVERLOADED,
+ TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
+ << ", tablet id: " << Self->TabletID());
+ return true;
+ } else {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::SCHEME_ERROR,
+ TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
+ << ", will be deleted soon, tablet id: " << Self->TabletID());
+ return true;
+ }
+ }
+ case TShardState::SplitSrcMakeSnapshot:
+ case TShardState::SplitSrcSendingSnapshot:
+ case TShardState::SplitSrcWaitForPartitioningChanged:
+ case TShardState::SplitDstReceivingSnapshot: {
+ SetStatusError(
+ Result->Record,
+ Ydb::StatusIds::OVERLOADED,
+ TStringBuilder() << "Shard in state " << DatashardStateName(Self->State)
+ << ", tablet id: " << Self->TabletID());
+ return true;
+ }
+ case TShardState::Uninitialized:
+ case TShardState::WaitScheme:
+ case TShardState::Unknown:
+ default:
SetStatusError(
Result->Record,
Ydb::StatusIds::INTERNAL_ERROR,
@@ -614,18 +656,20 @@ public:
void Complete(const TActorContext& ctx) override {
TReadIteratorId readId(Sender, Request->Record.GetReadId());
auto it = Self->ReadIterators.find(readId);
- if (it == Self->ReadIterators.end() || !Result) {
- // iterator has been aborted
- if (it != Self->ReadIterators.end())
- Self->ReadIterators.erase(it);
+ if (it == Self->ReadIterators.end()) {
+ // the one who removed the iterator should have reply to user
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " has been invalidated before TTxRead::Complete()");
+ return;
+ }
- if (!Result) {
- Result.reset(new TEvDataShard::TEvReadResult());
- }
+ if (!Result) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxRead::Execute() finished without Result, aborting");
+ Self->ReadIterators.erase(it);
- if (!Result->Record.HasStatus()) {
- SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
- }
+ Result.reset(new TEvDataShard::TEvReadResult());
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
ctx.Send(Sender, Result.release());
return;
@@ -635,7 +679,13 @@ public:
auto& record = Result->Record;
if (record.HasStatus()) {
record.SetReadId(readId.ReadId);
+ if (it->second) {
+ auto& state = *it->second;
+ record.SetSeqNo(state.SeqNo + 1);
+ }
ctx.Send(Sender, Result.release());
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxRead::Execute() finished with error, aborting");
Self->ReadIterators.erase(it);
return;
}
@@ -657,6 +707,8 @@ public:
new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId()));
}
} else {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " finished in read");
Self->ReadIterators.erase(it);
}
}
@@ -956,6 +1008,10 @@ public:
TTxType GetTxType() const override { return TXTYPE_READ; }
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ // note that we don't need to check shard state here:
+ // 1. Since TTxReadContinue scheduled, shard was ready.
+ // 2. If shards changes the state, it must cancel iterators and we will
+ // not find our readId ReadIterators.
TReadIteratorId readId(Ev->Get()->Reader, Ev->Get()->ReadId);
auto it = Self->ReadIterators.find(readId);
if (it == Self->ReadIterators.end()) {
@@ -1031,18 +1087,20 @@ public:
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
- if (it == Self->ReadIterators.end() || !Result) {
- // iterator has been aborted
- if (it != Self->ReadIterators.end())
- Self->ReadIterators.erase(it);
+ if (it == Self->ReadIterators.end()) {
+ // the one who removed the iterator should have reply to user
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " has been invalidated before TTxReadContinue::Complete()");
+ return;
+ }
- if (!Result) {
- Result.reset(new TEvDataShard::TEvReadResult());
- }
+ if (!Result) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxReadContinue::Execute() finished without Result, aborting");
+ Self->ReadIterators.erase(it);
- if (!Result->Record.HasStatus()) {
- SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
- }
+ Result.reset(new TEvDataShard::TEvReadResult());
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
ctx.Send(request->Reader, Result.release());
return;
@@ -1051,8 +1109,14 @@ public:
// error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
+ if (it->second) {
+ auto& state = *it->second;
+ record.SetSeqNo(state.SeqNo + 1);
+ }
record.SetReadId(readId.ReadId);
ctx.Send(request->Reader, Result.release());
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " TTxReadContinue::Execute() finished with error, aborting");
Self->ReadIterators.erase(it);
return;
}
@@ -1076,6 +1140,8 @@ public:
<< " Read quota exhausted for " << request->Reader << "," << request->ReadId);
}
} else {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
+ << " finished in ReadContinue");
Self->ReadIterators.erase(it);
}
}
@@ -1109,11 +1175,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) {
TReadIteratorId readId(ev->Get()->Reader, ev->Get()->ReadId);
if (Y_UNLIKELY(!ReadIterators.contains(readId))) {
- // was aborted
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
- SetStatusError(result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
- result->Record.SetReadId(readId.ReadId);
- ctx.Send(readId.Sender, result.release());
return;
}
@@ -1121,10 +1182,15 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon
}
void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& ctx) {
- // two possible cases:
+ // Possible cases:
// 1. read exhausted and we need to start its execution (if bytes available again),
// can start transaction right from here.
// 2. read is in progress, we need just to update quota.
+ // 3. we have become non-active and ignore.
+
+ if (!IsStateActive()) {
+ return;
+ }
const auto& record = ev->Get()->Record;
if (Y_UNLIKELY(!record.HasReadId() || !record.HasSeqNo() ||
@@ -1145,30 +1211,22 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
auto it = ReadIterators.find(readId);
if (it == ReadIterators.end()) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
- << " ReadAck on missing iterator: " << record);
-
- std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
- SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, "readId not found");
- result->Record.SetReadId(readId.ReadId);
- ctx.Send(ev->Sender, result.release());
+ << " ReadAck from " << ev->Sender << " on missing iterator: " << record);
return;
}
Y_ASSERT(it->second);
auto& state = *it->second;
if (state.State == NDataShard::TReadIteratorState::EState::Init) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
+ LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " ReadAck on not inialized iterator: " << record);
- // we don't reply here, because iterator will be initialized and user will
- // receive result
- // TODO: consider aborting iterator?
-
return;
}
+ // We received ACK on message we hadn't sent yet
if (state.SeqNo < record.GetSeqNo()) {
- auto issueStr = TStringBuilder() << TabletID() << " out of order ReadAck: " << record.GetSeqNo()
+ auto issueStr = TStringBuilder() << TabletID() << " ReadAck from future: " << record.GetSeqNo()
<< ", current seqNo# " << state.SeqNo;
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr);
@@ -1181,6 +1239,13 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
return;
}
+ if (state.LastAckSeqNo && state.LastAckSeqNo >= record.GetSeqNo()) {
+ // out of order, ignore
+ return;
+ }
+
+ state.LastAckSeqNo = record.GetSeqNo();
+
bool wasExhausted = state.IsExhausted();
state.UpQuota(
record.GetSeqNo(),
@@ -1209,4 +1274,30 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
ReadIterators.erase(readId);
}
+void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators #" << ReadIterators.size());
+
+ for (const auto& iterator: ReadIterators) {
+ const auto& readIteratorId = iterator.first;
+ const auto& state = iterator.second;
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(result->Record, code, issue);
+ result->Record.SetReadId(iterator.first.ReadId);
+ result->Record.SetSeqNo(state->SeqNo + 1);
+
+ ctx.Send(readIteratorId.Sender, result.release());
+ }
+
+ ReadIterators.clear();
+}
+
} // NKikimr::NDataShard
+
+template<>
+inline void Out<NKikimr::NDataShard::TReadIteratorId>(
+ IOutputStream& o,
+ const NKikimr::NDataShard::TReadIteratorId& info)
+{
+ o << info.ToString();
+}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 4b5586b8319..2bae187d430 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1531,6 +1531,8 @@ public:
bool CheckChangesQueueOverflow() const;
+ void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
+
private:
///
class TLoanReturnTracker {
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index c9efd503ec7..eba1ea0a0fa 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -196,6 +196,8 @@ public:
Self->SplitSrcSnapshotSender.AddDst(dstTablet);
}
+ Self->CancelReadIterators(Ydb::StatusIds::OVERLOADED, "Shard splitted", ctx);
+
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 316d4f1d6a3..201fb090852 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -348,6 +348,13 @@ struct TTestHelper {
}
}
+ void SplitTable1() {
+ auto& table1 = Tables["table-1"];
+ SetSplitMergePartCountLimit(Server->GetRuntime(), -1);
+ ui64 txId = AsyncSplitTable(Server, Sender, "/Root/table-1", table1.TabletId, 5);
+ WaitTxNotification(Server, Sender, txId);
+ }
+
std::unique_ptr<TEvDataShard::TEvRead> GetBaseReadRequest(
const TString& tableName,
ui64 readId,
@@ -683,7 +690,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record1 = readResult1->Record;
- UNIT_ASSERT(record1.GetLimitReached());
+ UNIT_ASSERT(!record1.GetLimitReached());
UNIT_ASSERT(record1.HasSeqNo());
UNIT_ASSERT(!record1.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
@@ -696,7 +703,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record2 = readResult2->Record;
- UNIT_ASSERT(record2.GetLimitReached());
+ UNIT_ASSERT(!record2.GetLimitReached());
UNIT_ASSERT(!record2.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
@@ -773,6 +780,60 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2);
}
+ Y_UNIT_TEST(ShouldHandleOutOfOrderReadAck) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ for (size_t i = 0; i < 8; ++i) {
+ AddKeyQuery(*request1, {1, 1, 1});
+ }
+
+ // limit quota
+ request1->Record.SetMaxRows(3);
+ request1->Record.SetMaxRowsInResult(1);
+
+ ui32 continueCounter = 0;
+ helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) {
+ ++continueCounter;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(!readResult1->Record.GetLimitReached());
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(!readResult2->Record.GetLimitReached());
+
+ auto readResult3 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult3->Record.GetLimitReached()); // quota is empty now
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2);
+
+ helper.SendReadAck("table-1", readResult3->Record, 1, 10000);
+
+ // since it's a test this one will be delivered the second and should be ignored
+ helper.SendReadAck("table-1", readResult2->Record, 10, 10000);
+
+ auto readResult4 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult4);
+ UNIT_ASSERT(readResult4->Record.GetLimitReached()); // quota is empty now
+
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 3);
+
+ auto readResult5 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult5);
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 3);
+
+ helper.SendReadAck("table-1", readResult4->Record, 1, 10000);
+ auto readResult6 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult6);
+ UNIT_ASSERT(readResult6->Record.GetLimitReached()); // quota is empty now
+ UNIT_ASSERT_VALUES_EQUAL(continueCounter, 4);
+ }
+
Y_UNIT_TEST(ShouldNotReadAfterCancel) {
TTestHelper helper;
@@ -801,11 +862,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.SendCancel("table-1", 1);
helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
- auto readResult2 = helper.WaitReadResult();
- UNIT_ASSERT(readResult2);
- UNIT_ASSERT(readResult2->Record.HasStatus());
- UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION);
-
+ auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult2);
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
}
@@ -974,7 +1032,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record1 = readResult1->Record;
- UNIT_ASSERT(record1.GetLimitReached());
+ UNIT_ASSERT(!record1.GetLimitReached());
UNIT_ASSERT(record1.HasSeqNo());
UNIT_ASSERT(!record1.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
@@ -996,7 +1054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record2 = readResult2->Record;
- UNIT_ASSERT(record2.GetLimitReached());
+ UNIT_ASSERT(!record2.GetLimitReached());
UNIT_ASSERT(!record2.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
@@ -1017,7 +1075,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record3 = readResult3->Record;
- UNIT_ASSERT(record3.GetLimitReached());
+ UNIT_ASSERT(!record3.GetLimitReached());
UNIT_ASSERT(!record3.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
@@ -1034,7 +1092,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
});
const auto& record4 = readResult4->Record;
- UNIT_ASSERT(record4.GetLimitReached());
+ UNIT_ASSERT(!record4.GetLimitReached());
UNIT_ASSERT(!record4.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record4.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record4.GetSeqNo(), 4UL);
@@ -1225,29 +1283,192 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
Y_UNIT_TEST(ShouldFailReadNextAfterSchemeChange) {
TTestHelper helper;
+ bool shouldDrop = true;
+ TAutoPtr<IEventHandle> continueEvent;
+
+ // capture original observer func by setting dummy one
+ auto& runtime = *helper.Server->GetRuntime();
+
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvReadContinue: {
+ if (shouldDrop) {
+ continueEvent = ev.Release();
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
auto request1 = helper.GetBaseReadRequest("table-1", 1);
AddKeyQuery(*request1, {3, 3, 3});
AddKeyQuery(*request1, {1, 1, 1});
- request1->Record.SetMaxRows(1);
+ AddKeyQuery(*request1, {5, 5, 5});
+
+ request1->Record.SetMaxRowsInResult(1);
auto readResult1 = helper.SendRead("table-1", request1.release());
auto txId = AsyncAlterAddExtraColumn(helper.Server, "/Root", "table-1");
WaitTxNotification(helper.Server, helper.Sender, txId);
- helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
+ // now allow to continue read
+ shouldDrop = false;
+ TAutoPtr<TEvDataShard::TEvReadContinue> request = continueEvent->Release<TEvDataShard::TEvReadContinue>();
+ UNIT_ASSERT_VALUES_EQUAL(request->ReadId, 1UL);
+
+ const auto& table = helper.Tables["table-1"];
+ runtime.SendToPipe(
+ table.TabletId,
+ helper.Sender,
+ request.Release(),
+ 0,
+ GetPipeConfigWithRetries(),
+ table.ClientId);
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvDataShard::EvReadContinue, 1);
+ runtime.DispatchEvents(options);
auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1);
+ }
+
+ Y_UNIT_TEST(ShouldFailReadNextAfterSchemeChangeExhausted) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+ request1->Record.SetMaxRows(1); // will wait for ack
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ auto txId = AsyncAlterAddExtraColumn(helper.Server, "/Root", "table-1");
+ WaitTxNotification(helper.Server, helper.Sender, txId);
+
+ helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR);
UNIT_ASSERT(readResult2->Record.HasReadId());
UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetReadId(), readResult1->Record.GetReadId());
// try to make one more read using this iterator
helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
- auto readResult3 = helper.WaitReadResult();
- UNIT_ASSERT(readResult3);
- UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION);
+ auto readResult3 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult3);
+ }
+
+ Y_UNIT_TEST(ShouldReceiveErrorAfterSplit) {
+ TTestHelper helper;
+
+ bool shouldDrop = true;
+ TAutoPtr<IEventHandle> continueEvent;
+
+ // capture original observer func by setting dummy one
+ auto& runtime = *helper.Server->GetRuntime();
+
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvReadContinue: {
+ if (shouldDrop) {
+ continueEvent = ev.Release();
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+ AddKeyQuery(*request1, {5, 5, 5});
+
+ request1->Record.SetMaxRowsInResult(1);
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(continueEvent);
+
+ helper.SplitTable1();
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1);
+
+ // now allow to continue read and check we don't get extra read result with error
+ shouldDrop = false;
+ TAutoPtr<TEvDataShard::TEvReadContinue> request = continueEvent->Release<TEvDataShard::TEvReadContinue>();
+ UNIT_ASSERT_VALUES_EQUAL(request->ReadId, 1UL);
+
+ const auto& table = helper.Tables["table-1"];
+ runtime.SendToPipe(
+ table.TabletId,
+ helper.Sender,
+ request.Release(),
+ 0,
+ GetPipeConfigWithRetries(),
+ table.ClientId);
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvDataShard::EvReadContinue, 1);
+ runtime.DispatchEvents(options);
+
+ auto readResult3 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult3);
+ }
+
+ Y_UNIT_TEST(ShouldReceiveErrorAfterSplitWhenExhausted) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+ AddKeyQuery(*request1, {1, 1, 1});
+
+ // set quota so that DS hangs waiting for ACK
+ request1->Record.SetMaxRows(1);
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+
+ helper.SplitTable1();
+
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1);
+ }
+
+ Y_UNIT_TEST(NoErrorOnFinalACK) {
+ TTestHelper helper;
+
+ auto request1 = helper.GetBaseReadRequest("table-1", 1);
+ AddKeyQuery(*request1, {3, 3, 3});
+
+ auto readResult1 = helper.SendRead("table-1", request1.release());
+ UNIT_ASSERT(readResult1);
+ UNIT_ASSERT(readResult1->Record.GetFinished());
+
+ helper.SendReadAck("table-1", readResult1->Record, 300, 10000);
+
+ auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10));
+ UNIT_ASSERT(!readResult2);
}
Y_UNIT_TEST(ShouldReadFromFollower) {
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 1e7dd221b55..2033a6fd865 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -143,6 +143,7 @@ public:
TVector<TQuota> ReadStats; // each index corresponds to SeqNo-1
ui64 SeqNo = 0;
+ ui64 LastAckSeqNo = 0;
ui32 FirstUnprocessedQuery = 0;
TString LastProcessedKey = 0;
};