diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-03-16 21:41:48 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-03-16 21:41:48 +0300 |
commit | 1421d442f2f49c9a7198a524391cb8a87b48aca8 (patch) | |
tree | 07e0b92928161d48779380a9750e5e90a36d9674 | |
parent | 750b348097bc9cfc09fdb15b3cc689656b8b2832 (diff) | |
download | ydb-1421d442f2f49c9a7198a524391cb8a87b48aca8.tar.gz |
don't allocate last processed key each time
-rw-r--r-- | ydb/core/scheme/scheme_tablecell.h | 43 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 99 |
3 files changed, 113 insertions, 54 deletions
diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index b33cfeed972..9cfaa529c90 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -5,6 +5,7 @@ #include "scheme_type_order.h" #include "scheme_types_defs.h" +#include <util/generic/bitops.h> #include <util/generic/hash.h> #include <util/system/unaligned_mem.h> @@ -467,6 +468,19 @@ public: return Cells; } + static void Serialize(TString& res, const TConstArrayRef<TCell>& cells) { + size_t sz = sizeof(ui16); + for (auto& c : cells) { + sz += sizeof(TValue) + c.Size(); + } + + if (res.capacity() < sz) { + res.reserve(FastClp2(sz + 1) - 1); + } + + DoSerialize(res, cells); + } + static TString Serialize(const TConstArrayRef<TCell>& cells) { if (cells.empty()) return TString(); @@ -478,15 +492,8 @@ public: TString res; res.reserve(sz); - ui16 cnt = cells.size(); - res.append((const char*)&cnt, sizeof(ui16)); - for (auto& c : cells) { - TValue header; - header.Size = c.Size(); - header.IsNull = c.IsNull(); - res.append((const char*)&header, sizeof(header)); - res.append(c.Data(), c.Size()); - } + + DoSerialize(res, cells); return res; } @@ -533,6 +540,24 @@ private: return true; } + // note, that res space should be reserved before this call + static void DoSerialize(TString& res, const TConstArrayRef<TCell>& cells) { + res.clear(); + + if (cells.empty()) + return; + + ui16 cnt = cells.size(); + res.append((const char*)&cnt, sizeof(ui16)); + for (auto& c : cells) { + TValue header; + header.Size = c.Size(); + header.IsNull = c.IsNull(); + res.append((const char*)&header, sizeof(header)); + res.append(c.Data(), c.Size()); + } + } + private: TString Buf; TVector<TCell> Cells; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 48fd4108e90..be2ba9a37a5 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -553,6 +553,17 @@ public: } else { Self->IncCounter(COUNTER_READ_ITERATOR_MAX_TIME_REACHED); } + + NKikimrTxDataShard::TReadContinuationToken continuationToken; + continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery); + + // note that when LastProcessedKey set then + // FirstUnprocessedQuery is definitely partially read range + if (LastProcessedKey) + continuationToken.SetLastProcessedKey(LastProcessedKey); + + bool res = continuationToken.SerializeToString(record.MutableContinuationToken()); + Y_ASSERT(res); } else { state.IsFinished = true; record.SetFinished(true); @@ -617,17 +628,6 @@ public: record.MutableSnapshot()->SetStep(State.ReadVersion.Step); record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId); } - - NKikimrTxDataShard::TReadContinuationToken continuationToken; - continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery); - - // note that when LastProcessedKey set then - // FirstUnprocessedQuery is definitely partially read range - if (LastProcessedKey) - continuationToken.SetLastProcessedKey(LastProcessedKey); - - bool res = continuationToken.SerializeToString(record.MutableContinuationToken()); - Y_ASSERT(res); } void UpdateState(TReadIteratorState& state) { @@ -688,8 +688,7 @@ private: Y_UNUSED(ctx); while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) { TDbTupleRef rowKey = iter->GetKey(); - - LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); + TSerializedCellVec::Serialize(LastProcessedKey, rowKey.Cells()); TDbTupleRef rowValues = iter->GetValues(); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 1711b34e6d0..e626a07cc05 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -234,6 +234,39 @@ void CheckResult( CheckResult(userTable, result, goldCells, types, columns); } +void CheckContinuationToken( + const TEvDataShard::TEvReadResult& result, + ui32 firstUprocessedQuery, + const std::vector<ui32>& gold) +{ + UNIT_ASSERT(result.Record.HasContinuationToken()); + + NKikimrTxDataShard::TReadContinuationToken readToken; + UNIT_ASSERT(readToken.ParseFromString(result.Record.GetContinuationToken())); + UNIT_ASSERT(readToken.HasFirstUnprocessedQuery()); + UNIT_ASSERT_VALUES_EQUAL(readToken.GetFirstUnprocessedQuery(), firstUprocessedQuery); + + if (gold.empty()) + return; + + UNIT_ASSERT(readToken.HasLastProcessedKey()); + + std::vector<NScheme::TTypeInfoOrder> types; + types.reserve(gold.size()); + for (auto i: xrange(gold.size())) { + Y_UNUSED(i); + types.emplace_back(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)); + } + + TCellVec goldRow; + for (const auto& item: gold) { + goldRow.push_back(TCell::Make(item)); + } + + TSerializedCellVec lastKey(readToken.GetLastProcessedKey()); + CheckRow(lastKey.GetCells(), goldRow, types); +} + template <typename TKeyType> TVector<TCell> ToCells(const std::vector<TKeyType>& keys) { TVector<TCell> cells; @@ -1115,10 +1148,10 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { const auto& record1 = readResult1->Record; UNIT_ASSERT(!record1.GetLimitReached()); UNIT_ASSERT(record1.HasSeqNo()); - //UNIT_ASSERT(!record1.HasFinished()); + UNIT_ASSERT(!record1.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL); - // TODO: check continuation token + CheckContinuationToken(*readResult1, 1, {}); auto readResult2 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { @@ -1130,7 +1163,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(!record2.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); - // TODO: check continuation token + CheckContinuationToken(*readResult2, 2, {}); auto readResult3 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult3, { @@ -1144,7 +1177,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(record3.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); - // TODO: check continuation token + UNIT_ASSERT(!record3.HasContinuationToken()); } Y_UNIT_TEST(ShouldReverseReadMultipleKeysOneByOne) { @@ -1177,7 +1210,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { //UNIT_ASSERT(!record1.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL); - // TODO: check continuation token + CheckContinuationToken(*readResult1, 1, {}); auto readResult2 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { @@ -1189,7 +1222,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(!record2.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); - // TODO: check continuation token + //CheckContinuationToken(*readResult1, 0, {}); auto readResult3 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult3, { @@ -1203,7 +1236,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(record3.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); - // TODO: check continuation token + UNIT_ASSERT(!record3.HasContinuationToken()); } Y_UNIT_TEST(ShouldHandleReadAck) { @@ -1377,11 +1410,20 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { {1, 1, 1, 100}, }; + std::vector<std::vector<ui32>> goldKeys = { + {11, 11, 11}, + {8, 1, 1}, + {5, 5, 5}, + {3, 3, 3}, + {1, 1, 1}, + }; + auto readResult = helper.SendRead("table-1", request.release()); UNIT_ASSERT(readResult); CheckResult(helper.Tables["table-1"].UserTable, *readResult, { gold[0] }); + CheckContinuationToken(*readResult, 1, goldKeys[0]); for (size_t i = 1; i < gold.size(); ++i) { helper.SendReadAck("table-1", readResult->Record, 1, 10000); @@ -1390,7 +1432,18 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { CheckResult(helper.Tables["table-1"].UserTable, *readResult, { gold[i] }); + if (i > 1) { + CheckContinuationToken(*readResult, 0, goldKeys[i]); + } else { + CheckContinuationToken(*readResult, 1, goldKeys[i]); + } } + + helper.SendReadAck("table-1", readResult->Record, 1, 10000); + readResult = helper.WaitReadResult(); + UNIT_ASSERT(readResult); + UNIT_ASSERT(readResult->Record.GetFinished()); + UNIT_ASSERT(!readResult->Record.HasContinuationToken()); } Y_UNIT_TEST(ShouldRangeReadReverseLeftInclusive) { @@ -1794,15 +1847,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL); - // TODO: check continuation token - #if 0 - UNIT_ASSERT_VALUES_EQUAL(readResult1.GetFirstUnprocessedQuery(), 0UL); - - UNIT_ASSERT(readResult1.HasLastProcessedKey()); - TOwnedCellVec lastKey1( - TSerializedCellVec(readResult1.GetLastProcessedKey()).GetCells()); - CheckRow(lastKey1, {1, 1, 1}); -#endif + CheckContinuationToken(*readResult1, 0, {1, 1, 1}); auto readResult2 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { @@ -1815,15 +1860,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL); - // TODO: check continuation token -#if 0 - UNIT_ASSERT_VALUES_EQUAL(readResult2.GetFirstUnprocessedQuery(), 0UL); - - UNIT_ASSERT(readResult2.HasLastProcessedKey()); - TOwnedCellVec lastKey2( - TSerializedCellVec(readResult2.GetLastProcessedKey()).GetCells()); - CheckRow(lastKey2, {3, 3, 3}); -#endif + CheckContinuationToken(*readResult2, 0, {3, 3, 3}); auto readResult3 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult3, { @@ -1836,11 +1873,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL); - // TODO: check continuation token -#if 0 - UNIT_ASSERT_VALUES_EQUAL(readResult3.GetFirstUnprocessedQuery(), 1UL); - UNIT_ASSERT(!readResult3.HasLastProcessedKey()); -#endif + CheckContinuationToken(*readResult3, 0, {5, 5, 5}); auto readResult4 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult4, { @@ -1852,7 +1885,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(!record4.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record4.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record4.GetSeqNo(), 4UL); - // TODO: check continuation token + + CheckContinuationToken(*readResult4, 1, {1, 1, 1}); auto readResult5 = helper.WaitReadResult(); CheckResult(helper.Tables["table-1"].UserTable, *readResult5, { @@ -1863,7 +1897,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(record5.HasFinished()); UNIT_ASSERT_VALUES_EQUAL(record5.GetReadId(), 1UL); UNIT_ASSERT_VALUES_EQUAL(record5.GetSeqNo(), 5UL); - // TODO: check no continuation token + + UNIT_ASSERT(!record5.HasContinuationToken()); } Y_UNIT_TEST(ShouldReadRangeChunk1_100) { |