aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <i@eivanov.com>2022-06-16 20:15:02 +0300
committerEvgeniy Ivanov <i@eivanov.com>2022-06-16 20:15:02 +0300
commitd00911f1684be22ca1f9ae03a8e579f41fafc9f5 (patch)
treef2ff42784f19b7f5e495d0cf72de9b7efb0ef40c
parent8cc06366642ee3cbe65f883eeda4b02941b3c748 (diff)
downloadydb-d00911f1684be22ca1f9ae03a8e579f41fafc9f5.tar.gz
KIKIMR-15102: always set ReadId and always reply in case of broken/aborted iterator
ref:3d6d69c51275c46692757044908adc574459a5f2
-rw-r--r--ydb/core/protos/tx_datashard.proto4
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp71
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp16
3 files changed, 75 insertions, 16 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 229038ceae8..d2c342e82e4 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1467,6 +1467,10 @@ message TEvGetCompactTableStatsResult {
// - ContinuationToken, which user can use to restart the read.
// 5. User reads until the end using TEvReadAck to update the quota.
// 6. User stops reading by TEvReadCancel.
+//
+// TEvReadResult is valid only if its Status is equal to Ydb::StatusIds::SUCCESS.
+// In case of any other status code iterator has been invalidated and further
+// usage of its ReadId will result in either non-success status code.
message TEvRead {
// User must always provide unique ReadId
// Note that shard distinguishes equal ReadId's
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 79f5d2c0baa..d80715f5695 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -614,20 +614,27 @@ public:
void Complete(const TActorContext& ctx) override {
TReadIteratorId readId(Sender, Request->Record.GetReadId());
auto it = Self->ReadIterators.find(readId);
- if (it == Self->ReadIterators.end()) {
+ if (it == Self->ReadIterators.end() || !Result) {
// iterator has been aborted
- return;
- }
+ if (it != Self->ReadIterators.end())
+ Self->ReadIterators.erase(it);
- // normally will not happen, just for extra sanity check
- if (!Result) {
- Self->ReadIterators.erase(it);
+ if (!Result) {
+ Result.reset(new TEvDataShard::TEvReadResult());
+ }
+
+ if (!Result->Record.HasStatus()) {
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
+ }
+ Result->Record.SetReadId(readId.ReadId);
+ ctx.Send(Sender, Result.release());
return;
}
// error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
+ record.SetReadId(readId.ReadId);
ctx.Send(Sender, Result.release());
Self->ReadIterators.erase(it);
return;
@@ -1019,20 +1026,27 @@ public:
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
- if (it == Self->ReadIterators.end()) {
+ if (it == Self->ReadIterators.end() || !Result) {
// iterator has been aborted
- return;
- }
+ if (it != Self->ReadIterators.end())
+ Self->ReadIterators.erase(it);
- // normally will not happen, just for extra sanity check
- if (!Result) {
- Self->ReadIterators.erase(it);
+ if (!Result) {
+ Result.reset(new TEvDataShard::TEvReadResult());
+ }
+
+ if (!Result->Record.HasStatus()) {
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
+ }
+ Result->Record.SetReadId(readId.ReadId);
+ ctx.Send(request->Reader, Result.release());
return;
}
// error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
+ record.SetReadId(readId.ReadId);
ctx.Send(request->Reader, Result.release());
Self->ReadIterators.erase(it);
return;
@@ -1078,6 +1092,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
result->Record,
Ydb::StatusIds::ALREADY_EXISTS,
TStringBuilder() << "Request " << readId.ReadId << " already executing");
+ result->Record.SetReadId(readId.ReadId);
ctx.Send(ev->Sender, result.release());
return;
}
@@ -1090,6 +1105,10 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon
TReadIteratorId readId(ev->Get()->Reader, ev->Get()->ReadId);
if (Y_UNLIKELY(!ReadIterators.contains(readId))) {
// was aborted
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
+ result->Record.SetReadId(readId.ReadId);
+ ctx.Send(readId.Sender, result.release());
return;
}
@@ -1107,6 +1126,12 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
!record.HasMaxRows() || !record.HasMaxBytes()))
{
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck: " << record);
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, "Missing mandatory fields in TEvReadAck");
+ if (record.HasReadId())
+ result->Record.SetReadId(record.GetReadId());
+ ctx.Send(ev->Sender, result.release());
return;
}
@@ -1116,6 +1141,11 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
if (it == ReadIterators.end()) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " ReadAck on missing iterator: " << record);
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, "readId not found");
+ result->Record.SetReadId(readId.ReadId);
+ ctx.Send(ev->Sender, result.release());
return;
}
@@ -1124,12 +1154,25 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
if (state.State == NDataShard::TReadIteratorState::EState::Init) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " ReadAck on not inialized iterator: " << record);
+
+ // we don't reply here, because iterator will be initialized and user will
+ // receive result
+ // TODO: consider aborting iterator?
+
return;
}
if (state.SeqNo < record.GetSeqNo()) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " out of order ReadAck: "
- << record << ", current seqNo# " << state.SeqNo);
+ auto issueStr = TStringBuilder() << TabletID() << " out of order ReadAck: " << record.GetSeqNo()
+ << ", current seqNo# " << state.SeqNo;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, issueStr);
+
+ std::unique_ptr<TEvDataShard::TEvReadResult> result(new TEvDataShard::TEvReadResult());
+ SetStatusError(result->Record, Ydb::StatusIds::BAD_SESSION, issueStr);
+ result->Record.SetReadId(readId.ReadId);
+ ctx.Send(ev->Sender, result.release());
+
+ ReadIterators.erase(it); // abort
return;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index a605a029fc3..f079cb75be1 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -788,8 +788,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.SendCancel("table-1", 1);
helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
- auto readResult2 = helper.WaitReadResult(TDuration::MilliSeconds(10));
- UNIT_ASSERT(!readResult2);
+ auto readResult2 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult2);
+ UNIT_ASSERT(readResult2->Record.HasStatus());
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION);
+
UNIT_ASSERT_VALUES_EQUAL(continueCounter, 0);
}
@@ -1223,6 +1226,15 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
auto readResult2 = helper.WaitReadResult();
UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetStatus().GetCode(), Ydb::StatusIds::SCHEME_ERROR);
+
+ UNIT_ASSERT(readResult2->Record.HasReadId());
+ UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.GetReadId(), readResult1->Record.GetReadId());
+
+ // try to make one more read using this iterator
+ helper.SendReadAck("table-1", readResult1->Record, 3, 10000);
+ auto readResult3 = helper.WaitReadResult();
+ UNIT_ASSERT(readResult3);
+ UNIT_ASSERT_VALUES_EQUAL(readResult3->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_SESSION);
}
};