diff options
author | Evgeniy Ivanov <i@eivanov.com> | 2022-06-24 14:41:25 +0300 |
---|---|---|
committer | Evgeniy Ivanov <i@eivanov.com> | 2022-06-24 14:41:25 +0300 |
commit | 3b32688dae7af8ef3eef83bb4cd15dc6e7c41875 (patch) | |
tree | 02c82fb38d7c187870196e46c02d5810a9dd8c64 | |
parent | 067fd14417000b3601483f660fe9e27c3b47f0b5 (diff) | |
download | ydb-3b32688dae7af8ef3eef83bb4cd15dc6e7c41875.tar.gz |
KIKIMR-13003: DS iterator protocol cleanup and bug fixes
* Handle ACK reordering
* Cancel iterators on shard split
* Return proper status codes and don't reply on unknown ACKs
ref:993845daed6c20157142bf575d714a79cdb23133
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 183 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_split_src.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 253 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 1 |
6 files changed, 392 insertions, 66 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index bf2b5400927..dc3dffbbf23 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1479,10 +1479,12 @@ message TEvGetCompactTableStatsResult { // - SeqNo that should be used by user in TEvReadAck // - ContinuationToken, which user can use to restart the read. // -// 5. On each TEvReadResult except the last one user sends TEvReadAck to update quota and continue reading. -// * If user has already received multiple TEvReadResult messages, it is allowed to send +// 5. On each TEvReadResult user sends TEvReadAck to update quota and continue reading. +// * If user has already received multiple TEvReadResult messages, he is allowed to send // single TEvReadAck with SeqNo from the last result. -// * If user received final TEvReadResult or TEvReadResult with error he should not send any reply. +// * If user has received final TEvReadResult, he is allowed to not send TEvReadAck, though sending message is +// not a protocol violation. +// * If user has received TEvReadResult with error he should not send any reply. // Note that server will stop sending TEvReadResult messages only if last sent result contains either // Finished or LimitReached field set. Otherwise until there is no disconnect, user can rely that // he will receive more TEvReadResult messages. Though to improve latency it's a good practice @@ -1492,8 +1494,13 @@ message TEvGetCompactTableStatsResult { // or stops reading hisself using TEvReadCancel. // // TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS. +// * Ydb::StatusIds::OVERLOADED means that shard has possibly been splitted, user must check schemeshard to find +// proper shards. +// * Ydb::StatusIds::SCHEME_ERROR means that either scheme version mismatch, wrong columns or table has been deleted. +// * Ydb::StatusIds::BAD_REQUEST should not be retried, because it shows protocol violation. // In case of any other status code iterator has been invalidated and further -// usage of its ReadId will result in either non-success status code. +// usage of its ReadId will result in either non-success status code. Also non-success result message might not +// contain SeqNo. // // Shard is allowed to send TEvReadResult without actual results, but with LimitReached field set. // E.g. it happens when initial TEvRead has too small quota to read at least single row. @@ -1604,6 +1611,8 @@ message TEvReadResult { // True when shard may have possibly returned more data, but // stopped because it reached MaxRows or MaxBytes limits. + // Note that if data split between multiple results because of + // MaxRowsInResult, then this field is not set optional bool LimitReached = 7; optional NKikimrQueryStats.TReadOpStats Stats = 8; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index a87f5179e89..32dcd969e61 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -338,7 +338,7 @@ public: bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) { for (; FirstUnprocessedQuery < State.Request->Ranges.size(); ++FirstUnprocessedQuery) { - if (OutOfQuota() || ShouldStopByElapsedTime()) + if (ShouldStop()) return true; const auto& range = State.Request->Ranges[FirstUnprocessedQuery]; @@ -358,7 +358,7 @@ public: bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) { for (; FirstUnprocessedQuery < State.Request->Keys.size(); ++FirstUnprocessedQuery) { - if (OutOfQuota() || ShouldStopByElapsedTime()) + if (ShouldStop()) return true; const auto& key = State.Request->Keys[FirstUnprocessedQuery]; @@ -475,11 +475,18 @@ public: private: bool OutOfQuota() const { return RowsRead >= State.Quota.Rows || - RowsRead >= State.MaxRowsInResult || BlockBuilder.Bytes() >= State.Quota.Bytes|| BytesInResult >= State.Quota.Bytes; } + bool HasMaxRowsInResult() const { + return RowsRead >= State.MaxRowsInResult; + } + + bool ShouldStop() { + return OutOfQuota() || HasMaxRowsInResult() || ShouldStopByElapsedTime(); + } + bool Precharge( NTable::TDatabase& db, NTable::TRawVals keyFrom, @@ -522,7 +529,7 @@ private: ++RowsRead; RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats); - if (OutOfQuota() || ShouldStopByElapsedTime()) { + if (ShouldStop()) { return EReadStatus::StoppedByLimit; } } @@ -583,10 +590,45 @@ public: Result.reset(new TEvDataShard::TEvReadResult()); - if (Self->State != TShardState::Ready && - Self->State != TShardState::Readonly) - { - // TODO: do we need more state checks here? + switch (Self->State) { + case TShardState::Ready: + case TShardState::Readonly: + case TShardState::Frozen: + case TShardState::SplitSrcWaitForNoTxInFlight: + break; + case TShardState::Offline: + case TShardState::PreOffline: { + if (Self->SrcSplitDescription) { + SetStatusError( + Result->Record, + Ydb::StatusIds::OVERLOADED, + TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) + << ", tablet id: " << Self->TabletID()); + return true; + } else { + SetStatusError( + Result->Record, + Ydb::StatusIds::SCHEME_ERROR, + TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) + << ", will be deleted soon, tablet id: " << Self->TabletID()); + return true; + } + } + case TShardState::SplitSrcMakeSnapshot: + case TShardState::SplitSrcSendingSnapshot: + case TShardState::SplitSrcWaitForPartitioningChanged: + case TShardState::SplitDstReceivingSnapshot: { + SetStatusError( + Result->Record, + Ydb::StatusIds::OVERLOADED, + TStringBuilder() << "Shard in state " << DatashardStateName(Self->State) + << ", tablet id: " << Self->TabletID()); + return true; + } + case TShardState::Uninitialized: + case TShardState::WaitScheme: + case TShardState::Unknown: + default: SetStatusError( Result->Record, Ydb::StatusIds::INTERNAL_ERROR, @@ -614,18 +656,20 @@ public: void Complete(const TActorContext& ctx) override { TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end() || !Result) { - // iterator has been aborted - if (it != Self->ReadIterators.end()) - Self->ReadIterators.erase(it); + if (it == Self->ReadIterators.end()) { + // the one who removed the iterator should have reply to user + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " has been invalidated before TTxRead::Complete()"); + return; + } - if (!Result) { - Result.reset(new TEvDataShard::TEvReadResult()); - } + if (!Result) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxRead::Execute() finished without Result, aborting"); + Self->ReadIterators.erase(it); - if (!Result->Record.HasStatus()) { - SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - } + Result.reset(new TEvDataShard::TEvReadResult()); + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); ctx.Send(Sender, Result.release()); return; @@ -635,7 +679,13 @@ public: auto& record = Result->Record; if (record.HasStatus()) { record.SetReadId(readId.ReadId); + if (it->second) { + auto& state = *it->second; + record.SetSeqNo(state.SeqNo + 1); + } ctx.Send(Sender, Result.release()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxRead::Execute() finished with error, aborting"); Self->ReadIterators.erase(it); return; } @@ -657,6 +707,8 @@ public: new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId())); } } else { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " finished in read"); Self->ReadIterators.erase(it); } } @@ -956,6 +1008,10 @@ public: TTxType GetTxType() const override { return TXTYPE_READ; } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + // note that we don't need to check shard state here: + // 1. Since TTxReadContinue scheduled, shard was ready. + // 2. If shards changes the state, it must cancel iterators and we will + // not find our readId ReadIterators. TReadIteratorId readId(Ev->Get()->Reader, Ev->Get()->ReadId); auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { @@ -1031,18 +1087,20 @@ public: const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end() || !Result) { - // iterator has been aborted - if (it != Self->ReadIterators.end()) - Self->ReadIterators.erase(it); + if (it == Self->ReadIterators.end()) { + // the one who removed the iterator should have reply to user + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " has been invalidated before TTxReadContinue::Complete()"); + return; + } - if (!Result) { - Result.reset(new TEvDataShard::TEvReadResult()); - } + if (!Result) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxReadContinue::Execute() finished without Result, aborting"); + Self->ReadIterators.erase(it); - if (!Result->Record.HasStatus()) { - SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - } + Result.reset(new TEvDataShard::TEvReadResult()); + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); ctx.Send(request->Reader, Result.release()); return; @@ -1051,8 +1109,14 @@ public: // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { + if (it->second) { + auto& state = *it->second; + record.SetSeqNo(state.SeqNo + 1); + } record.SetReadId(readId.ReadId); ctx.Send(request->Reader, Result.release()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " TTxReadContinue::Execute() finished with error, aborting"); Self->ReadIterators.erase(it); return; } @@ -1076,6 +1140,8 @@ public: << " Read quota exhausted for " << request->Reader << "," << request->ReadId); } } else { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + << " finished in ReadContinue"); Self->ReadIterators.erase(it); } } @@ -1109,11 +1175,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) { TReadIteratorId readId(ev->Get()->Reader, ev->Get()->ReadId); if (Y_UNLIKELY(!ReadIterators.contains(readId))) { - // was aborted - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); - SetStatusError(result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - result->Record.SetReadId(readId.ReadId); - ctx.Send(readId.Sender, result.release()); return; } @@ -1121,10 +1182,15 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon } void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& ctx) { - // two possible cases: + // Possible cases: // 1. read exhausted and we need to start its execution (if bytes available again), // can start transaction right from here. // 2. read is in progress, we need just to update quota. + // 3. we have become non-active and ignore. + + if (!IsStateActive()) { + return; + } const auto& record = ev->Get()->Record; if (Y_UNLIKELY(!record.HasReadId() || !record.HasSeqNo() || @@ -1145,30 +1211,22 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& auto it = ReadIterators.find(readId); if (it == ReadIterators.end()) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() - << " ReadAck on missing iterator: " << record); - - std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); - SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, "readId not found"); - result->Record.SetReadId(readId.ReadId); - ctx.Send(ev->Sender, result.release()); + << " ReadAck from " << ev->Sender << " on missing iterator: " << record); return; } Y_ASSERT(it->second); auto& state = *it->second; if (state.State == NDataShard::TReadIteratorState::EState::Init) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() + LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck on not inialized iterator: " << record); - // we don't reply here, because iterator will be initialized and user will - // receive result - // TODO: consider aborting iterator? - return; } + // We received ACK on message we hadn't sent yet if (state.SeqNo < record.GetSeqNo()) { - auto issueStr = TStringBuilder() << TabletID() << " out of order ReadAck: " << record.GetSeqNo() + auto issueStr = TStringBuilder() << TabletID() << " ReadAck from future: " << record.GetSeqNo() << ", current seqNo# " << state.SeqNo; LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr); @@ -1181,6 +1239,13 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& return; } + if (state.LastAckSeqNo && state.LastAckSeqNo >= record.GetSeqNo()) { + // out of order, ignore + return; + } + + state.LastAckSeqNo = record.GetSeqNo(); + bool wasExhausted = state.IsExhausted(); state.UpQuota( record.GetSeqNo(), @@ -1209,4 +1274,30 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte ReadIterators.erase(readId); } +void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CancelReadIterators #" << ReadIterators.size()); + + for (const auto& iterator: ReadIterators) { + const auto& readIteratorId = iterator.first; + const auto& state = iterator.second; + + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError(result->Record, code, issue); + result->Record.SetReadId(iterator.first.ReadId); + result->Record.SetSeqNo(state->SeqNo + 1); + + ctx.Send(readIteratorId.Sender, result.release()); + } + + ReadIterators.clear(); +} + } // NKikimr::NDataShard + +template<> +inline void Out<NKikimr::NDataShard::TReadIteratorId>( + IOutputStream& o, + const NKikimr::NDataShard::TReadIteratorId& info) +{ + o << info.ToString(); +} diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 4b5586b8319..2bae187d430 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1531,6 +1531,8 @@ public: bool CheckChangesQueueOverflow() const; + void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); + private: /// class TLoanReturnTracker { diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index c9efd503ec7..eba1ea0a0fa 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -196,6 +196,8 @@ public: Self->SplitSrcSnapshotSender.AddDst(dstTablet); } + Self->CancelReadIterators(Ydb::StatusIds::OVERLOADED, "Shard splitted", ctx); + return true; } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 316d4f1d6a3..201fb090852 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -348,6 +348,13 @@ struct TTestHelper { } } + void SplitTable1() { + auto& table1 = Tables["table-1"]; + SetSplitMergePartCountLimit(Server->GetRuntime(), -1); + ui64 txId = AsyncSplitTable(Server, Sender, "/Root/table-1", table1.TabletId, 5); + WaitTxNotification(Server, Sender, txId); + } + std::unique_ptr<TEvDataShard::TEvRead> GetBaseReadRequest( const TString& tableName, ui64 readId, @@ -683,7 +690,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record1 = readResult1->Record; - UNIT_ASSERT(record1.GetLimitReached()); + UNIT_ASSERT(!record1.GetLimitReached()); UNIT_ASSERT(record1.HasSeqNo()); UNIT_ASSERT(!record1.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); @@ -696,7 +703,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record2 = readResult2->Record; - UNIT_ASSERT(record2.GetLimitReached()); + UNIT_ASSERT(!record2.GetLimitReached()); UNIT_ASSERT(!record2.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); @@ -773,6 +780,60 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2); } + Y_UNIT_TEST(ShouldHandleOutOfOrderReadAck) { + TTestHelper helper; + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + for (size_t i = 0; i < 8; ++i) { + AddKeyQuery(*request1, {1, 1, 1}); + } + + // limit quota + request1->Record.SetMaxRows(3); + request1->Record.SetMaxRowsInResult(1); + + ui32 continueCounter = 0; + helper.Server->GetRuntime()->SetObserverFunc([&continueCounter](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvReadContinue) { + ++continueCounter; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + UNIT_ASSERT(!readResult1->Record.GetLimitReached()); + + auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT(!readResult2->Record.GetLimitReached()); + + auto readResult3 = helper.WaitReadResult(); + UNIT_ASSERT(readResult3->Record.GetLimitReached()); // quota is empty now + + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 2); + + helper.SendReadAck("table-1", readResult3->Record, 1, 10000); + + // since it's a test this one will be delivered the second and should be ignored + helper.SendReadAck("table-1", readResult2->Record, 10, 10000); + + auto readResult4 = helper.WaitReadResult(); + UNIT_ASSERT(readResult4); + UNIT_ASSERT(readResult4->Record.GetLimitReached()); // quota is empty now + + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 3); + + auto readResult5 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult5); + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 3); + + helper.SendReadAck("table-1", readResult4->Record, 1, 10000); + auto readResult6 = helper.WaitReadResult(); + UNIT_ASSERT(readResult6); + UNIT_ASSERT(readResult6->Record.GetLimitReached()); // quota is empty now + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 4); + } + Y_UNIT_TEST(ShouldNotReadAfterCancel) { TTestHelper helper; @@ -801,11 +862,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.SendCancel("table-1", 1); helper.SendReadAck("table-1", readResult1->Record, 3, 10000); - auto readResult2 = helper.WaitReadResult(); - UNIT_ASSERT(readResult2); - UNIT_ASSERT(readResult2->Record.HasStatus()); - UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION); - + auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult2); UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0); } @@ -974,7 +1032,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record1 = readResult1->Record; - UNIT_ASSERT(record1.GetLimitReached()); + UNIT_ASSERT(!record1.GetLimitReached()); UNIT_ASSERT(record1.HasSeqNo()); UNIT_ASSERT(!record1.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); @@ -996,7 +1054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record2 = readResult2->Record; - UNIT_ASSERT(record2.GetLimitReached()); + UNIT_ASSERT(!record2.GetLimitReached()); UNIT_ASSERT(!record2.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); @@ -1017,7 +1075,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record3 = readResult3->Record; - UNIT_ASSERT(record3.GetLimitReached()); + UNIT_ASSERT(!record3.GetLimitReached()); UNIT_ASSERT(!record3.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); @@ -1034,7 +1092,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { }); const auto& record4 = readResult4->Record; - UNIT_ASSERT(record4.GetLimitReached()); + UNIT_ASSERT(!record4.GetLimitReached()); UNIT_ASSERT(!record4.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record4.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record4.GetSeqNo(), 4UL); @@ -1225,29 +1283,192 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { Y_UNIT_TEST(ShouldFailReadNextAfterSchemeChange) { TTestHelper helper; + bool shouldDrop = true; + TAutoPtr<IEventHandle> continueEvent; + + // capture original observer func by setting dummy one + auto& runtime = *helper.Server->GetRuntime(); + + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvReadContinue: { + if (shouldDrop) { + continueEvent = ev.Release(); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + } + default: + return originalObserver(runtime, ev); + } + }); + auto request1 = helper.GetBaseReadRequest("table-1", 1); AddKeyQuery(*request1, {3, 3, 3}); AddKeyQuery(*request1, {1, 1, 1}); - request1->Record.SetMaxRows(1); + AddKeyQuery(*request1, {5, 5, 5}); + + request1->Record.SetMaxRowsInResult(1); auto readResult1 = helper.SendRead("table-1", request1.release()); auto txId = AsyncAlterAddExtraColumn(helper.Server, "/Root", "table-1"); WaitTxNotification(helper.Server, helper.Sender, txId); - helper.SendReadAck("table-1", readResult1->Record, 3, 10000); + // now allow to continue read + shouldDrop = false; + TAutoPtr<TEvDataShard::TEvReadContinue> request = continueEvent->Release<TEvDataShard::TEvReadContinue>(); + UNIT_ASSERT_VALUES_EQUAL(request->ReadId, 1UL); + + const auto& table = helper.Tables["table-1"]; + runtime.SendToPipe( + table.TabletId, + helper.Sender, + request.Release(), + 0, + GetPipeConfigWithRetries(), + table.ClientId); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvDataShard::EvReadContinue, 1); + runtime.DispatchEvents(options); auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT(readResult2); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1); + } + + Y_UNIT_TEST(ShouldFailReadNextAfterSchemeChangeExhausted) { + TTestHelper helper; + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + request1->Record.SetMaxRows(1); // will wait for ack + + auto readResult1 = helper.SendRead("table-1", request1.release()); + auto txId = AsyncAlterAddExtraColumn(helper.Server, "/Root", "table-1"); + WaitTxNotification(helper.Server, helper.Sender, txId); + + helper.SendReadAck("table-1", readResult1->Record, 3, 10000); + + auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR); UNIT_ASSERT(readResult2->Record.HasReadId()); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetReadId(), readResult1->Record.GetReadId()); // try to make one more read using this iterator helper.SendReadAck("table-1", readResult1->Record, 3, 10000); - auto readResult3 = helper.WaitReadResult(); - UNIT_ASSERT(readResult3); - UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION); + auto readResult3 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult3); + } + + Y_UNIT_TEST(ShouldReceiveErrorAfterSplit) { + TTestHelper helper; + + bool shouldDrop = true; + TAutoPtr<IEventHandle> continueEvent; + + // capture original observer func by setting dummy one + auto& runtime = *helper.Server->GetRuntime(); + + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvReadContinue: { + if (shouldDrop) { + continueEvent = ev.Release(); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + } + default: + return originalObserver(runtime, ev); + } + }); + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + AddKeyQuery(*request1, {5, 5, 5}); + + request1->Record.SetMaxRowsInResult(1); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + UNIT_ASSERT(continueEvent); + + helper.SplitTable1(); + + auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT(readResult2); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1); + + // now allow to continue read and check we don't get extra read result with error + shouldDrop = false; + TAutoPtr<TEvDataShard::TEvReadContinue> request = continueEvent->Release<TEvDataShard::TEvReadContinue>(); + UNIT_ASSERT_VALUES_EQUAL(request->ReadId, 1UL); + + const auto& table = helper.Tables["table-1"]; + runtime.SendToPipe( + table.TabletId, + helper.Sender, + request.Release(), + 0, + GetPipeConfigWithRetries(), + table.ClientId); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvDataShard::EvReadContinue, 1); + runtime.DispatchEvents(options); + + auto readResult3 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult3); + } + + Y_UNIT_TEST(ShouldReceiveErrorAfterSplitWhenExhausted) { + TTestHelper helper; + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + AddKeyQuery(*request1, {1, 1, 1}); + + // set quota so that DS hangs waiting for ACK + request1->Record.SetMaxRows(1); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + + helper.SplitTable1(); + + auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT(readResult2); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetSeqNo(), readResult1->Record.GetSeqNo() + 1); + } + + Y_UNIT_TEST(NoErrorOnFinalACK) { + TTestHelper helper; + + auto request1 = helper.GetBaseReadRequest("table-1", 1); + AddKeyQuery(*request1, {3, 3, 3}); + + auto readResult1 = helper.SendRead("table-1", request1.release()); + UNIT_ASSERT(readResult1); + UNIT_ASSERT(readResult1->Record.GetFinished()); + + helper.SendReadAck("table-1", readResult1->Record, 300, 10000); + + auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10)); + UNIT_ASSERT(!readResult2); } Y_UNIT_TEST(ShouldReadFromFollower) { diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 1e7dd221b55..2033a6fd865 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -143,6 +143,7 @@ public: TVector<TQuota> ReadStats; // each index corresponds to SeqNo-1 ui64 SeqNo = 0; + ui64 LastAckSeqNo = 0; ui32 FirstUnprocessedQuery = 0; TString LastProcessedKey = 0; }; |