diff options
author | Evgeniy Ivanov <i@eivanov.com> | 2022-06-16 20:15:02 +0300 |
---|---|---|
committer | Evgeniy Ivanov <i@eivanov.com> | 2022-06-16 20:15:02 +0300 |
commit | d00911f1684be22ca1f9ae03a8e579f41fafc9f5 (patch) | |
tree | f2ff42784f19b7f5e495d0cf72de9b7efb0ef40c | |
parent | 8cc06366642ee3cbe65f883eeda4b02941b3c748 (diff) | |
download | ydb-d00911f1684be22ca1f9ae03a8e579f41fafc9f5.tar.gz |
KIKIMR-15102: always set ReadId and always reply in case of broken/aborted iterator
ref:3d6d69c51275c46692757044908adc574459a5f2
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 71 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 16 |
3 files changed, 75 insertions, 16 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 229038ceae8..d2c342e82e4 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1467,6 +1467,10 @@ message TEvGetCompactTableStatsResult { // - ContinuationToken, which user can use to restart the read. // 5. User reads until the end using TEvReadAck to update the quota. // 6. User stops reading by TEvReadCancel. +// +// TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS. +// In case of any other status code iterator has been invalidated and further +// usage of its ReadId will result in either non-success status code. message TEvRead { // User must always provide unique ReadId // Note that shard distinguishes equal ReadId's diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 79f5d2c0baa..d80715f5695 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -614,20 +614,27 @@ public: void Complete(const TActorContext& ctx) override { TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end()) { + if (it == Self->ReadIterators.end() || !Result) { // iterator has been aborted - return; - } + if (it != Self->ReadIterators.end()) + Self->ReadIterators.erase(it); - // normally will not happen, just for extra sanity check - if (!Result) { - Self->ReadIterators.erase(it); + if (!Result) { + Result.reset(new TEvDataShard::TEvReadResult()); + } + + if (!Result->Record.HasStatus()) { + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); + } + Result->Record.SetReadId(readId.ReadId); + ctx.Send(Sender, Result.release()); return; } // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { + record.SetReadId(readId.ReadId); ctx.Send(Sender, Result.release()); Self->ReadIterators.erase(it); return; @@ -1019,20 +1026,27 @@ public: const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end()) { + if (it == Self->ReadIterators.end() || !Result) { // iterator has been aborted - return; - } + if (it != Self->ReadIterators.end()) + Self->ReadIterators.erase(it); - // normally will not happen, just for extra sanity check - if (!Result) { - Self->ReadIterators.erase(it); + if (!Result) { + Result.reset(new TEvDataShard::TEvReadResult()); + } + + if (!Result->Record.HasStatus()) { + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); + } + Result->Record.SetReadId(readId.ReadId); + ctx.Send(request->Reader, Result.release()); return; } // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { + record.SetReadId(readId.ReadId); ctx.Send(request->Reader, Result.release()); Self->ReadIterators.erase(it); return; @@ -1078,6 +1092,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct result->Record, Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Request " << readId.ReadId << " already executing"); + result->Record.SetReadId(readId.ReadId); ctx.Send(ev->Sender, result.release()); return; } @@ -1090,6 +1105,10 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon TReadIteratorId readId(ev->Get()->Reader, ev->Get()->ReadId); if (Y_UNLIKELY(!ReadIterators.contains(readId))) { // was aborted + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError(result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); + result->Record.SetReadId(readId.ReadId); + ctx.Send(readId.Sender, result.release()); return; } @@ -1107,6 +1126,12 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& !record.HasMaxRows() || !record.HasMaxBytes())) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record); + + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing mandatory fields in TEvReadAck"); + if (record.HasReadId()) + result->Record.SetReadId(record.GetReadId()); + ctx.Send(ev->Sender, result.release()); return; } @@ -1116,6 +1141,11 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& if (it == ReadIterators.end()) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck on missing iterator: " << record); + + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, "readId not found"); + result->Record.SetReadId(readId.ReadId); + ctx.Send(ev->Sender, result.release()); return; } @@ -1124,12 +1154,25 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& if (state.State == NDataShard::TReadIteratorState::EState::Init) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck on not inialized iterator: " << record); + + // we don't reply here, because iterator will be initialized and user will + // receive result + // TODO: consider aborting iterator? + return; } if (state.SeqNo < record.GetSeqNo()) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " out of order ReadAck: " - << record << ", current seqNo# " << state.SeqNo); + auto issueStr = TStringBuilder() << TabletID() << " out of order ReadAck: " << record.GetSeqNo() + << ", current seqNo# " << state.SeqNo; + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr); + + std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult()); + SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr); + result->Record.SetReadId(readId.ReadId); + ctx.Send(ev->Sender, result.release()); + + ReadIterators.erase(it); // abort return; } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index a605a029fc3..f079cb75be1 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -788,8 +788,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { helper.SendCancel("table-1", 1); helper.SendReadAck("table-1", readResult1->Record, 3, 10000); - auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10)); - UNIT_ASSERT(!readResult2); + auto readResult2 = helper.WaitReadResult(); + UNIT_ASSERT(readResult2); + UNIT_ASSERT(readResult2->Record.HasStatus()); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION); + UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0); } @@ -1223,6 +1226,15 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { auto readResult2 = helper.WaitReadResult(); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR); + + UNIT_ASSERT(readResult2->Record.HasReadId()); + UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetReadId(), readResult1->Record.GetReadId()); + + // try to make one more read using this iterator + helper.SendReadAck("table-1", readResult1->Record, 3, 10000); + auto readResult3 = helper.WaitReadResult(); + UNIT_ASSERT(readResult3); + UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION); } }; |