summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <[email protected]>2024-09-02 16:08:49 +0300
committerGitHub <[email protected]>2024-09-02 16:08:49 +0300
commit37c91d73971af00b2d673427307934cfd3d9117d (patch)
treedda306e3eadb7d1f6034de311b10ddedebfe83a2
parentb9b486d4cf1f2fcb336100094e3113520bf8809e (diff)
Fix external reference precharging in datashard read iterators (#8592)
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp184
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp169
-rw-r--r--ydb/core/tx/datashard/read_iterator.h2
3 files changed, 226 insertions, 129 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index b0dadde686c..b056c481688 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -220,15 +220,15 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder(
}
std::vector<TRawTypeValue> ToRawTypeValue(
- const TSerializedCellVec& keyCells,
+ TArrayRef<const TCell> keyCells,
const TShortTableInfo& tableInfo,
bool addNulls)
{
std::vector<TRawTypeValue> result;
- result.reserve(keyCells.GetCells().size());
+ result.reserve(keyCells.size());
- for (ui32 i = 0; i < keyCells.GetCells().size(); ++i) {
- result.push_back(TRawTypeValue(keyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i]));
+ for (ui32 i = 0; i < keyCells.size(); ++i) {
+ result.push_back(TRawTypeValue(keyCells[i].AsRef(), tableInfo.KeyColumnTypes[i]));
}
// note that currently without nulls it is [prefix, +inf, +inf],
@@ -275,7 +275,7 @@ class TReader {
ui32 FirstUnprocessedQuery; // must be unsigned
TString LastProcessedKey;
- bool LastProcessedKeyErasedOrMissing = false;
+ bool LastProcessedKeyErased = false;
ui64 RowsRead = 0;
ui64 RowsProcessed = 0;
@@ -298,9 +298,9 @@ class TReader {
bool VolatileWaitForCommit = false;
enum class EReadStatus {
- Done = 0,
+ Done,
NeedData,
- StoppedByLimit,
+ NeedContinue,
};
public:
@@ -317,7 +317,7 @@ public:
, TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
, LastProcessedKey(State.LastProcessedKey)
- , LastProcessedKeyErasedOrMissing(State.LastProcessedKeyErasedOrMissing)
+ , LastProcessedKeyErased(State.LastProcessedKeyErased)
{
GetTimeFast(&StartTime);
EndTime = StartTime;
@@ -325,7 +325,6 @@ public:
EReadStatus ReadRange(
TTransactionContext& txc,
- const TActorContext& ctx,
const TSerializedTableRange& range)
{
bool fromInclusive;
@@ -335,7 +334,7 @@ public:
if (LastProcessedKey) {
if (!State.Reverse) {
keyFromCells = TSerializedCellVec(LastProcessedKey);
- fromInclusive = LastProcessedKeyErasedOrMissing;
+ fromInclusive = LastProcessedKeyErased;
keyToCells = range.To;
toInclusive = range.ToInclusive;
@@ -345,7 +344,7 @@ public:
fromInclusive = range.FromInclusive;
keyToCells = TSerializedCellVec(LastProcessedKey);
- toInclusive = LastProcessedKeyErasedOrMissing;
+ toInclusive = LastProcessedKeyErased;
}
} else {
keyFromCells = range.From;
@@ -355,8 +354,8 @@ public:
toInclusive = range.ToInclusive;
}
- const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, fromInclusive);
- const auto keyTo = ToRawTypeValue(keyToCells, TableInfo, !toInclusive);
+ const auto keyFrom = ToRawTypeValue(keyFromCells.GetCells(), TableInfo, fromInclusive);
+ const auto keyTo = ToRawTypeValue(keyToCells.GetCells(), TableInfo, !toInclusive);
// TODO: split range into parts like in read_columns
@@ -367,7 +366,7 @@ public:
iterRange.MaxInclusive = toInclusive;
const bool reverse = State.Reverse;
- if (TArrayRef<const TCell> cells = keyFromCells.GetCells()) {
+ if (TArrayRef<const TCell> cells = (reverse ? keyToCells.GetCells() : keyFromCells.GetCells())) {
if (!fromInclusive || cells.size() >= TableInfo.KeyColumnTypes.size()) {
Self->GetKeyAccessSampler()->AddSample(TableId, cells);
} else {
@@ -383,31 +382,19 @@ public:
if (!reverse) {
auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
- result = IterateRange(iter.Get(), ctx, txc.Env);
+ result = IterateRange(iter.Get(), iterRange, txc);
} else {
auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
- result = IterateRange(iter.Get(), ctx, txc.Env);
+ result = IterateRange(iter.Get(), iterRange, txc);
}
txc.Env.DisableReadMissingReferences();
- if (result == EReadStatus::NeedData && !(RowsProcessed && CanResume())) {
- if (LastProcessedKey) {
- keyFromCells = TSerializedCellVec(LastProcessedKey);
- const auto keyFrom = ToRawTypeValue(keyFromCells, TableInfo, false);
- Precharge(txc.DB, keyFrom, iterRange.MaxKey, reverse);
- } else {
- Precharge(txc.DB, iterRange.MinKey, iterRange.MaxKey, reverse);
- }
- return EReadStatus::NeedData;
- }
-
return result;
}
EReadStatus ReadKey(
TTransactionContext& txc,
- const TActorContext& ctx,
const TSerializedCellVec& keyCells,
size_t keyIndex)
{
@@ -418,7 +405,7 @@ public:
range.To = keyCells;
range.ToInclusive = true;
range.FromInclusive = true;
- return ReadRange(txc, ctx, range);
+ return ReadRange(txc, range);
}
if (ColumnTypes.empty()) {
@@ -429,13 +416,16 @@ public:
}
}
- const auto key = ToRawTypeValue(keyCells, TableInfo, true);
+ const auto key = ToRawTypeValue(keyCells.GetCells(), TableInfo, true);
NTable::TRowState rowState;
rowState.Init(State.Columns.size());
NTable::TSelectStats stats;
auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
if (ready == NTable::EReady::Page) {
+ if (RowsProcessed && CanResume()) {
+ return EReadStatus::NeedContinue;
+ }
return EReadStatus::NeedData;
}
@@ -467,11 +457,11 @@ public:
{
if (keyCells.GetCells().size() != TableInfo.KeyColumnCount) {
// key prefix, treat it as range [prefix, null, null] - [prefix, +inf, +inf]
- auto minKey = ToRawTypeValue(keyCells, TableInfo, true);
- auto maxKey = ToRawTypeValue(keyCells, TableInfo, false);
+ auto minKey = ToRawTypeValue(keyCells.GetCells(), TableInfo, true);
+ auto maxKey = ToRawTypeValue(keyCells.GetCells(), TableInfo, false);
return Precharge(txc.DB, minKey, maxKey, State.Reverse);
} else {
- auto key = ToRawTypeValue(keyCells, TableInfo, true);
+ auto key = ToRawTypeValue(keyCells.GetCells(), TableInfo, true);
return Precharge(txc.DB, key, key, State.Reverse);
}
}
@@ -502,7 +492,7 @@ public:
// TODO: merge ReadRanges and ReadKeys to single template Read?
- bool ReadRanges(TTransactionContext& txc, const TActorContext& ctx) {
+ bool ReadRanges(TTransactionContext& txc) {
// note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
// then it will also become less than size() when finished
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
@@ -516,19 +506,16 @@ public:
return true;
const auto& range = State.Request->Ranges[FirstUnprocessedQuery];
- auto status = ReadRange(txc, ctx, range);
+ auto status = ReadRange(txc, range);
switch (status) {
case EReadStatus::Done:
break;
- case EReadStatus::StoppedByLimit:
- return true;
case EReadStatus::NeedData:
- if (RowsProcessed && CanResume())
- return true;
-
// Note: ReadRange has already precharged current range and
// we don't precharge multiple ranges as opposed to keys
return false;
+ case EReadStatus::NeedContinue:
+ return true;
}
if (!State.Reverse)
@@ -541,7 +528,7 @@ public:
return true;
}
- bool ReadKeys(TTransactionContext& txc, const TActorContext& ctx) {
+ bool ReadKeys(TTransactionContext& txc) {
// note that FirstUnprocessedQuery is unsigned and if we do reverse iteration,
// then it will also become less than size() when finished
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
@@ -555,18 +542,15 @@ public:
return true;
const auto& key = State.Request->Keys[FirstUnprocessedQuery];
- auto status = ReadKey(txc, ctx, key, FirstUnprocessedQuery);
+ auto status = ReadKey(txc, key, FirstUnprocessedQuery);
switch (status) {
case EReadStatus::Done:
break;
- case EReadStatus::StoppedByLimit:
- return true;
case EReadStatus::NeedData:
- if (RowsProcessed && CanResume())
- return true;
-
PrechargeKeysAfter(txc, FirstUnprocessedQuery);
return false;
+ case EReadStatus::NeedContinue:
+ return true;
}
if (!State.Reverse)
@@ -580,16 +564,16 @@ public:
}
// return semantics the same as in the Execute()
- bool Read(TTransactionContext& txc, const TActorContext& ctx) {
+ bool Read(TTransactionContext& txc) {
// TODO: consider trying to precharge multiple records at once in case
// when first precharge fails?
if (!State.Request->Keys.empty()) {
- return ReadKeys(txc, ctx);
+ return ReadKeys(txc);
}
// since no keys, then we must have ranges (has been checked initially)
- return ReadRanges(txc, ctx);
+ return ReadRanges(txc);
}
bool HasUnreadQueries() const {
@@ -764,7 +748,7 @@ public:
state.TotalRows += RowsRead;
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
state.LastProcessedKey = LastProcessedKey;
- state.LastProcessedKeyErasedOrMissing = LastProcessedKeyErasedOrMissing;
+ state.LastProcessedKeyErased = LastProcessedKeyErased;
if (sentResult) {
state.ConsumeSeqNo(RowsRead, BytesInResult);
}
@@ -852,8 +836,8 @@ private:
bool Precharge(
NTable::TDatabase& db,
- NTable::TRawVals keyFrom,
- NTable::TRawVals keyTo,
+ NTable::TRawVals minKey,
+ NTable::TRawVals maxKey,
bool reverse)
{
ui64 rowsLeft = GetRowsLeft();
@@ -861,8 +845,8 @@ private:
auto direction = reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward;
return db.Precharge(TableInfo.LocalTid,
- keyFrom,
- keyTo,
+ minKey,
+ maxKey,
State.Columns,
0,
rowsLeft,
@@ -872,9 +856,7 @@ private:
}
template <typename TIterator>
- EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx, IExecuting& env) {
- Y_UNUSED(ctx);
-
+ EReadStatus IterateRange(TIterator* iter, NTable::TKeyRange& iterRange, TTransactionContext& txc) {
auto keyAccessSampler = Self->GetKeyAccessSampler();
bool advanced = false;
@@ -887,38 +869,21 @@ private:
TDbTupleRef rowKey = iter->GetKey();
TDbTupleRef rowValues = iter->GetValues();
- if (!precharging && env.MissingReferencesSize()) {
+ if (!precharging && txc.Env.MissingReferencesSize()) {
+ // Note: the current key must be returned to reader, but the
+ // previous key is lost, and we cannot safely resume. We can
+ // only restart query from the beginning, and don't want to
+ // keep track of any stats.
precharging = true;
-
- ui64 deletedRowSkips = iter->Stats.DeletedRowSkips;
- ui64 invisibleRowSkips = iter->Stats.InvisibleRowSkips;
-
- const ui64 processedRecords = ResetRowSkips(iter->Stats);
-
- if ((RowsProcessed + processedRecords) > 0) {
- // There were some rows (regular, erased or invisible), so
- // this transaction won't be restarting and there is no point in precharging missing references.
- RowsSinceLastCheck += processedRecords;
- RowsProcessed += processedRecords;
-
- DeletedRowSkips += deletedRowSkips;
- InvisibleRowSkips += invisibleRowSkips;
-
- // We will be continuing from the current key (inclusive).
- LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
- LastProcessedKeyErasedOrMissing = true;
- break;
- }
}
if (precharging) {
- // Precharge only if we didn't meet any rows prior to a row with a missing reference.
- // Meanwhile, RowsProcessed, RowsSinceLastCheck and LastProcessed key are not updated,
+ // Note: RowsProcessed, RowsSinceLastCheck and LastProcessed key are not updated,
// so we will restart the transaction from the exact same key we started iterating from.
prechargedCount++;
prechargedRowsSize += EstimateSize(rowValues.Cells());
- if (ReachedTotalRowsLimit(prechargedCount) || ShouldStop(prechargedCount, prechargedRowsSize + env.MissingReferencesSize())) {
+ if (ReachedTotalRowsLimit(prechargedCount) || ShouldStop(prechargedCount, prechargedRowsSize + txc.Env.MissingReferencesSize())) {
break;
}
@@ -947,8 +912,8 @@ private:
if (ShouldStop()) {
LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
- LastProcessedKeyErasedOrMissing = false;
- return EReadStatus::StoppedByLimit;
+ LastProcessedKeyErased = false;
+ return EReadStatus::NeedContinue;
}
}
@@ -960,20 +925,38 @@ private:
// row). When there are not enough rows we would prefer restarting in
// the same transaction, instead of starting a new one, in which case
// we will not update stats and will not update RowsProcessed.
- if (!precharging) {
- auto lastKey = iter->GetKey().Cells();
-
- if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) {
- LastProcessedKey = TSerializedCellVec::Serialize(lastKey);
- LastProcessedKeyErasedOrMissing = iter->GetKeyState() == NTable::ERowOp::Erase;
- advanced = true;
+ auto lastKey = iter->GetKey().Cells();
+
+ auto prechargeFromLastKey = [&]() {
+ if (lastKey) {
+ const auto key = ToRawTypeValue(lastKey, TableInfo, false);
+ if (!State.Reverse) {
+ Precharge(txc.DB, key, iterRange.MaxKey, State.Reverse);
+ } else {
+ Precharge(txc.DB, iterRange.MinKey, key, State.Reverse);
+ }
} else {
- LastProcessedKey.clear();
+ Precharge(txc.DB, iterRange.MinKey, iterRange.MaxKey, State.Reverse);
+ }
+ };
+
+ if (precharging) {
+ if (iter->Last() == NTable::EReady::Page) {
+ prechargeFromLastKey();
}
+ return EReadStatus::NeedData;
+ }
+
+ if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) {
+ LastProcessedKey = TSerializedCellVec::Serialize(lastKey);
+ LastProcessedKeyErased = iter->GetKeyState() == NTable::ERowOp::Erase;
+ advanced = true;
+ } else {
+ LastProcessedKey.clear();
}
// last iteration to Page or Gone might also have deleted or invisible rows
- if (advanced || (iter->Last() != NTable::EReady::Page && !precharging)) {
+ if (advanced || iter->Last() != NTable::EReady::Page) {
DeletedRowSkips += iter->Stats.DeletedRowSkips;
InvisibleRowSkips += iter->Stats.InvisibleRowSkips;
const ui64 processedRecords = ResetRowSkips(iter->Stats);
@@ -981,9 +964,14 @@ private:
RowsProcessed += processedRecords;
}
- // TODO: consider restart when Page and too few data read
- // (how much is too few, less than user's limit?)
- if (iter->Last() == NTable::EReady::Page || precharging) {
+ if (iter->Last() == NTable::EReady::Page) {
+ // TODO: consider restart when Page and too few data read
+ // (how much is too few, less than user's limit?)
+ if (RowsProcessed && CanResume()) {
+ return EReadStatus::NeedContinue;
+ }
+
+ prechargeFromLastKey();
return EReadStatus::NeedData;
}
@@ -1824,7 +1812,7 @@ private:
AppData()->MonotonicTimeProvider->Now(),
Self));
- return Reader->Read(txc, ctx);
+ return Reader->Read(txc);
}
void PrepareValidationInfo(const TActorContext&, const TReadIteratorState& state) {
@@ -2482,7 +2470,7 @@ public:
LWTRACK(ReadExecute, state.Orbit);
- if (Reader->Read(txc, ctx)) {
+ if (Reader->Read(txc)) {
// Retry later when dependencies are resolved
if (!Reader->GetVolatileReadDependencies().empty()) {
state.ReadContinuePending = true;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
index 9b48fa579e5..8b845b0f4b6 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
@@ -24,15 +24,22 @@ using namespace Tests;
Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
- struct ReadIteratorCounter {
+ struct TReadIteratorCounter {
int Reads = 0;
int Continues = 0;
int EvGets = 0;
int BlobsRequested = 0;
+
+ bool operator==(const TReadIteratorCounter&) const = default;
+
+ friend inline IOutputStream& operator<<(IOutputStream& out, const TReadIteratorCounter& c) {
+ out << "{ " << c.Reads << ", " << c.Continues << ", " << c.EvGets << ", " << c.BlobsRequested << " }";
+ return out;
+ }
};
- std::unique_ptr<ReadIteratorCounter> SetupReadIteratorObserver(TTestActorRuntime& runtime) {
- std::unique_ptr<ReadIteratorCounter> iteratorCounter = std::make_unique<ReadIteratorCounter>();
+ std::unique_ptr<TReadIteratorCounter> SetupReadIteratorObserver(TTestActorRuntime& runtime) {
+ std::unique_ptr<TReadIteratorCounter> iteratorCounter = std::make_unique<TReadIteratorCounter>();
auto captureEvents = [&](TAutoPtr<IEventHandle> &event) -> auto {
switch (event->GetTypeRewrite()) {
@@ -59,7 +66,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
return iteratorCounter;
}
- struct Node {
+ struct TNode {
TPortManager Pm;
TServerSettings ServerSettings;
TServer::TPtr Server;
@@ -68,7 +75,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
TActorId Sender;
TTestActorRuntime* Runtime;
- Node(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) {
+ TNode(bool useExternalBlobs, int externalBlobColumns = 1) : ServerSettings(Pm.GetPort(2134)) {
ServerSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.AddStoragePool("ssd")
@@ -113,7 +120,12 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
};
- void ValidateReadResult(TTestActorRuntime& runtime, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, int rowsCount, int firstBlobChunkNum = 0, int extBlobColumnCount = 1) {
+ void ValidateReadResult(TTestActorRuntime& runtime,
+ NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture,
+ int rowsCount,
+ int firstBlobChunkNum = 0,
+ int extBlobColumnCount = 1)
+ {
Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture));
auto& operation = res.Getoperation();
UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS);
@@ -132,7 +144,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
UNIT_ASSERT(chunkNumValue.has_int32_value());
UNIT_ASSERT_EQUAL(chunkNumValue.Getint32_value(), firstBlobChunkNum + i);
-
+
for (int j = 0; j < extBlobColumnCount; j++) {
auto& dataValue = row.get_idx_items(2 + j);
UNIT_ASSERT(dataValue.has_bytes_value());
@@ -141,8 +153,10 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
}
- template <ui8 resultSize>
- void ValidateReadResult(TTestActorRuntime& runtime, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture, std::array<i32, resultSize> expectedResult) {
+ void ValidateReadResult(TTestActorRuntime& runtime,
+ NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> readFuture,
+ const std::vector<i32>& expectedResult)
+ {
Ydb::Table::ExecuteDataQueryResponse res = AwaitResponse(runtime, std::move(readFuture));
auto& operation = res.Getoperation();
UNIT_ASSERT_VALUES_EQUAL(operation.status(), Ydb::StatusIds::SUCCESS);
@@ -150,7 +164,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
operation.result().UnpackTo(&result);
UNIT_ASSERT_EQUAL(result.result_sets().size(), 1);
auto& resultSet = result.result_sets()[0];
- UNIT_ASSERT_EQUAL(resultSet.rows_size(), resultSize);
+ UNIT_ASSERT_EQUAL(size_t(resultSet.rows_size()), expectedResult.size());
for (int i = 0; i < resultSet.rows_size(); i++) {
auto& row = resultSet.get_idx_rows(i);
@@ -161,7 +175,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
UNIT_ASSERT(chunkNumValue.has_int32_value());
UNIT_ASSERT_EQUAL(chunkNumValue.Getint32_value(), expectedResult[i]);
-
+
auto& dataValue = row.get_idx_items(2);
UNIT_ASSERT(dataValue.has_bytes_value());
UNIT_ASSERT_EQUAL(dataValue.bytes_value().size(), 1_MB);
@@ -169,7 +183,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
Y_UNIT_TEST(ExtBlobs) {
- Node node(true);
+ TNode node(true);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -225,7 +239,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
Y_UNIT_TEST(ExtBlobsWithDeletesInTheBeginning) {
- Node node(true);
+ TNode node(true);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -268,13 +282,13 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
ValidateReadResult(runtime, std::move(readFuture), 3, 7);
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1);
- UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 1);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 0);
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 1);
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 3);
}
Y_UNIT_TEST(ExtBlobsWithDeletesInTheEnd) {
- Node node(true);
+ TNode node(true);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -323,7 +337,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
Y_UNIT_TEST(ExtBlobsWithDeletesInTheMiddle) {
- Node node(true);
+ TNode node(true);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -370,16 +384,113 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
ORDER BY blob_id, chunk_num ASC
LIMIT 100;)");
- ValidateReadResult<6>(runtime, std::move(readFuture), {1, 5, 6, 7, 8, 9});
+ ValidateReadResult(runtime, std::move(readFuture), {1, 5, 6, 7, 8, 9});
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, 1);
- UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 2);
+ UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Continues, 1);
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, 2);
UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, 6);
}
+ void DoExtBlobsWithFirstRowPreloaded(bool withReboot) {
+ TNode node(true);
+
+ auto server = node.Server;
+ auto& runtime = *node.Runtime;
+ auto& sender = node.Sender;
+ auto shard1 = node.Shard;
+ auto tableId1 = node.TableId;
+
+ TString largeValue(1_MB, 'L');
+
+ for (int i = 0; i < 10; i++) {
+ TString chunkNum = ToString(i);
+ TString query = R"___(
+ UPSERT INTO `/Root/table-1` (blob_id, chunk_num, data0) VALUES
+ (Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), )___" + chunkNum + ", \"" + largeValue + "\");";
+ ExecSQL(server, sender, query);
+ }
+
+ CompactTable(runtime, shard1, tableId1, false);
+
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ auto preloadFuture = KqpSimpleSend(runtime, R"(
+ SELECT blob_id, chunk_num, data0
+ FROM `/Root/table-1`
+ WHERE
+ blob_id = Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c") AND
+ chunk_num = 0;
+ )");
+
+ ValidateReadResult(runtime, std::move(preloadFuture), 1);
+
+ size_t passedRows = 0;
+ bool finished = false;
+ std::vector<TEvDataShard::TEvReadResult::TPtr> blockedResults;
+ std::optional<std::pair<TActorId, ui64>> dropReadId;
+ auto blockResults = runtime.AddObserver<TEvDataShard::TEvReadResult>(
+ [&](TEvDataShard::TEvReadResult::TPtr& ev) {
+ auto* msg = ev->Get();
+ if (dropReadId) {
+ if (*dropReadId == std::make_pair(ev->GetRecipientRewrite(), msg->Record.GetReadId())) {
+ ev.Reset();
+ }
+ return;
+ }
+ if (passedRows > 0) {
+ blockedResults.push_back(std::move(ev));
+ return;
+ }
+ passedRows += msg->GetRowsCount();
+ if (msg->Record.GetFinished()) {
+ finished = true;
+ }
+ });
+
+ auto readFuture = KqpSimpleSend(runtime, R"(
+ SELECT blob_id, chunk_num, data0
+ FROM `/Root/table-1`
+ WHERE
+ blob_id = Uuid("65df1ec1-a97d-47b2-ae56-3c023da6ee8c") AND
+ chunk_num >= 0
+ ORDER BY blob_id, chunk_num ASC
+ LIMIT 5;
+ )");
+
+ runtime.WaitFor("blocked results", [&]{ return blockedResults.size() > 0 || finished; });
+
+ if (!finished) {
+ UNIT_ASSERT_VALUES_EQUAL(passedRows, 1u);
+
+ if (withReboot) {
+ dropReadId.emplace(
+ blockedResults[0]->GetRecipientRewrite(),
+ blockedResults[0]->Get()->Record.GetReadId());
+
+ RebootTablet(runtime, shard1, sender);
+ } else {
+ blockResults.Remove();
+ for (auto& ev : blockedResults) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ blockedResults.clear();
+ }
+ }
+
+ ValidateReadResult(runtime, std::move(readFuture), 5);
+ }
+
+ Y_UNIT_TEST(ExtBlobsWithFirstRowPreloaded) {
+ DoExtBlobsWithFirstRowPreloaded(false);
+ }
+
+ Y_UNIT_TEST(ExtBlobsWithFirstRowPreloadedWithReboot) {
+ DoExtBlobsWithFirstRowPreloaded(true);
+ }
+
Y_UNIT_TEST(ExtBlobsMultipleColumns) {
- Node node(true, 2);
+ TNode node(true, 2);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -436,11 +547,11 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
Y_UNIT_TEST(ExtBlobsWithCompactingMiddleRows) {
- std::unordered_map<int, ReadIteratorCounter> expectedResults;
- expectedResults[1] = {1, 7, 7, 18};
- expectedResults[2] = {1, 8, 7, 16};
- expectedResults[3] = {1, 7, 6, 14};
- expectedResults[4] = {1, 8, 6, 12};
+ std::unordered_map<int, TReadIteratorCounter> expectedResults;
+ expectedResults[1] = {1, 4, 4, 18};
+ expectedResults[2] = {1, 4, 4, 16};
+ expectedResults[3] = {1, 4, 4, 14};
+ expectedResults[4] = {1, 4, 4, 12};
expectedResults[5] = {1, 4, 2, 10};
// We write 20 rows, some of them are compacted, then we write some more rows "before" and "after" and read all of them
@@ -449,7 +560,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
for (int test = 1; test < 6; test++) {
int compactedPart = 20 - (test * 2);
- Node node(true);
+ TNode node(true);
auto server = node.Server;
auto& runtime = *node.Runtime;
@@ -520,14 +631,12 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
auto& expectedResult = expectedResults[test];
- UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->Reads, expectedResult.Reads);
- UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->EvGets, expectedResult.EvGets);
- UNIT_ASSERT_VALUES_EQUAL(iteratorCounter->BlobsRequested, expectedResult.BlobsRequested);
+ UNIT_ASSERT_VALUES_EQUAL_C(*iteratorCounter, expectedResult, "test " << test);
}
}
Y_UNIT_TEST(ExtBlobsEmptyTable) {
- Node node(true);
+ TNode node(true);
auto& runtime = *node.Runtime;
@@ -543,7 +652,7 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
}
Y_UNIT_TEST(NotExtBlobs) {
- Node node(false);
+ TNode node(false);
auto server = node.Server;
auto& runtime = *node.Runtime;
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 96cea578d50..6648388a613 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -212,7 +212,7 @@ public:
ui64 LastAckSeqNo = 0;
ui32 FirstUnprocessedQuery = 0;
TString LastProcessedKey;
- bool LastProcessedKeyErasedOrMissing = false;
+ bool LastProcessedKeyErased = false;
// Orbit used for tracking progress
NLWTrace::TOrbit Orbit;