aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-03-16 21:41:48 +0300
committereivanov89 <eivanov89@ydb.tech>2023-03-16 21:41:48 +0300
commit1421d442f2f49c9a7198a524391cb8a87b48aca8 (patch)
tree07e0b92928161d48779380a9750e5e90a36d9674
parent750b348097bc9cfc09fdb15b3cc689656b8b2832 (diff)
downloadydb-1421d442f2f49c9a7198a524391cb8a87b48aca8.tar.gz
don't allocate last processed key each time
-rw-r--r--ydb/core/scheme/scheme_tablecell.h43
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp99
3 files changed, 113 insertions, 54 deletions
diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h
index b33cfeed972..9cfaa529c90 100644
--- a/ydb/core/scheme/scheme_tablecell.h
+++ b/ydb/core/scheme/scheme_tablecell.h
@@ -5,6 +5,7 @@
#include "scheme_type_order.h"
#include "scheme_types_defs.h"
+#include <util/generic/bitops.h>
#include <util/generic/hash.h>
#include <util/system/unaligned_mem.h>
@@ -467,6 +468,19 @@ public:
return Cells;
}
+ static void Serialize(TString& res, const TConstArrayRef<TCell>& cells) {
+ size_t sz = sizeof(ui16);
+ for (auto& c : cells) {
+ sz += sizeof(TValue) + c.Size();
+ }
+
+ if (res.capacity() < sz) {
+ res.reserve(FastClp2(sz + 1) - 1);
+ }
+
+ DoSerialize(res, cells);
+ }
+
static TString Serialize(const TConstArrayRef<TCell>& cells) {
if (cells.empty())
return TString();
@@ -478,15 +492,8 @@ public:
TString res;
res.reserve(sz);
- ui16 cnt = cells.size();
- res.append((const char*)&cnt, sizeof(ui16));
- for (auto& c : cells) {
- TValue header;
- header.Size = c.Size();
- header.IsNull = c.IsNull();
- res.append((const char*)&header, sizeof(header));
- res.append(c.Data(), c.Size());
- }
+
+ DoSerialize(res, cells);
return res;
}
@@ -533,6 +540,24 @@ private:
return true;
}
+ // note, that res space should be reserved before this call
+ static void DoSerialize(TString& res, const TConstArrayRef<TCell>& cells) {
+ res.clear();
+
+ if (cells.empty())
+ return;
+
+ ui16 cnt = cells.size();
+ res.append((const char*)&cnt, sizeof(ui16));
+ for (auto& c : cells) {
+ TValue header;
+ header.Size = c.Size();
+ header.IsNull = c.IsNull();
+ res.append((const char*)&header, sizeof(header));
+ res.append(c.Data(), c.Size());
+ }
+ }
+
private:
TString Buf;
TVector<TCell> Cells;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 48fd4108e90..be2ba9a37a5 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -553,6 +553,17 @@ public:
} else {
Self->IncCounter(COUNTER_READ_ITERATOR_MAX_TIME_REACHED);
}
+
+ NKikimrTxDataShard::TReadContinuationToken continuationToken;
+ continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery);
+
+ // note that when LastProcessedKey set then
+ // FirstUnprocessedQuery is definitely partially read range
+ if (LastProcessedKey)
+ continuationToken.SetLastProcessedKey(LastProcessedKey);
+
+ bool res = continuationToken.SerializeToString(record.MutableContinuationToken());
+ Y_ASSERT(res);
} else {
state.IsFinished = true;
record.SetFinished(true);
@@ -617,17 +628,6 @@ public:
record.MutableSnapshot()->SetStep(State.ReadVersion.Step);
record.MutableSnapshot()->SetTxId(State.ReadVersion.TxId);
}
-
- NKikimrTxDataShard::TReadContinuationToken continuationToken;
- continuationToken.SetFirstUnprocessedQuery(FirstUnprocessedQuery);
-
- // note that when LastProcessedKey set then
- // FirstUnprocessedQuery is definitely partially read range
- if (LastProcessedKey)
- continuationToken.SetLastProcessedKey(LastProcessedKey);
-
- bool res = continuationToken.SerializeToString(record.MutableContinuationToken());
- Y_ASSERT(res);
}
void UpdateState(TReadIteratorState& state) {
@@ -688,8 +688,7 @@ private:
Y_UNUSED(ctx);
while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) {
TDbTupleRef rowKey = iter->GetKey();
-
- LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
+ TSerializedCellVec::Serialize(LastProcessedKey, rowKey.Cells());
TDbTupleRef rowValues = iter->GetValues();
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 1711b34e6d0..e626a07cc05 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -234,6 +234,39 @@ void CheckResult(
CheckResult(userTable, result, goldCells, types, columns);
}
+void CheckContinuationToken(
+ const TEvDataShard::TEvReadResult& result,
+ ui32 firstUprocessedQuery,
+ const std::vector<ui32>& gold)
+{
+ UNIT_ASSERT(result.Record.HasContinuationToken());
+
+ NKikimrTxDataShard::TReadContinuationToken readToken;
+ UNIT_ASSERT(readToken.ParseFromString(result.Record.GetContinuationToken()));
+ UNIT_ASSERT(readToken.HasFirstUnprocessedQuery());
+ UNIT_ASSERT_VALUES_EQUAL(readToken.GetFirstUnprocessedQuery(), firstUprocessedQuery);
+
+ if (gold.empty())
+ return;
+
+ UNIT_ASSERT(readToken.HasLastProcessedKey());
+
+ std::vector<NScheme::TTypeInfoOrder> types;
+ types.reserve(gold.size());
+ for (auto i: xrange(gold.size())) {
+ Y_UNUSED(i);
+ types.emplace_back(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32));
+ }
+
+ TCellVec goldRow;
+ for (const auto& item: gold) {
+ goldRow.push_back(TCell::Make(item));
+ }
+
+ TSerializedCellVec lastKey(readToken.GetLastProcessedKey());
+ CheckRow(lastKey.GetCells(), goldRow, types);
+}
+
template <typename TKeyType>
TVector<TCell> ToCells(const std::vector<TKeyType>& keys) {
TVector<TCell> cells;
@@ -1115,10 +1148,10 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
const auto& record1 = readResult1->Record;
UNIT_ASSERT(!record1.GetLimitReached());
UNIT_ASSERT(record1.HasSeqNo());
- //UNIT_ASSERT(!record1.HasFinished());
+ UNIT_ASSERT(!record1.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL);
- // TODO: check continuation token
+ CheckContinuationToken(*readResult1, 1, {});
auto readResult2 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
@@ -1130,7 +1163,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(!record2.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
- // TODO: check continuation token
+ CheckContinuationToken(*readResult2, 2, {});
auto readResult3 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult3, {
@@ -1144,7 +1177,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(record3.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
- // TODO: check continuation token
+ UNIT_ASSERT(!record3.HasContinuationToken());
}
Y_UNIT_TEST(ShouldReverseReadMultipleKeysOneByOne) {
@@ -1177,7 +1210,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
//UNIT_ASSERT(!record1.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL);
- // TODO: check continuation token
+ CheckContinuationToken(*readResult1, 1, {});
auto readResult2 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
@@ -1189,7 +1222,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(!record2.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
- // TODO: check continuation token
+ //CheckContinuationToken(*readResult1, 0, {});
auto readResult3 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult3, {
@@ -1203,7 +1236,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(record3.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
- // TODO: check continuation token
+ UNIT_ASSERT(!record3.HasContinuationToken());
}
Y_UNIT_TEST(ShouldHandleReadAck) {
@@ -1377,11 +1410,20 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
{1, 1, 1, 100},
};
+ std::vector<std::vector<ui32>> goldKeys = {
+ {11, 11, 11},
+ {8, 1, 1},
+ {5, 5, 5},
+ {3, 3, 3},
+ {1, 1, 1},
+ };
+
auto readResult = helper.SendRead("table-1", request.release());
UNIT_ASSERT(readResult);
CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
gold[0]
});
+ CheckContinuationToken(*readResult, 1, goldKeys[0]);
for (size_t i = 1; i < gold.size(); ++i) {
helper.SendReadAck("table-1", readResult->Record, 1, 10000);
@@ -1390,7 +1432,18 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
gold[i]
});
+ if (i > 1) {
+ CheckContinuationToken(*readResult, 0, goldKeys[i]);
+ } else {
+ CheckContinuationToken(*readResult, 1, goldKeys[i]);
+ }
}
+
+ helper.SendReadAck("table-1", readResult->Record, 1, 10000);
+ readResult = helper.WaitReadResult();
+ UNIT_ASSERT(readResult);
+ UNIT_ASSERT(readResult->Record.GetFinished());
+ UNIT_ASSERT(!readResult->Record.HasContinuationToken());
}
Y_UNIT_TEST(ShouldRangeReadReverseLeftInclusive) {
@@ -1794,15 +1847,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(record1.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record1.GetSeqNo(), 1UL);
- // TODO: check continuation token
- #if 0
- UNIT_ASSERT_VALUES_EQUAL(readResult1.GetFirstUnprocessedQuery(), 0UL);
-
- UNIT_ASSERT(readResult1.HasLastProcessedKey());
- TOwnedCellVec lastKey1(
- TSerializedCellVec(readResult1.GetLastProcessedKey()).GetCells());
- CheckRow(lastKey1, {1, 1, 1});
-#endif
+ CheckContinuationToken(*readResult1, 0, {1, 1, 1});
auto readResult2 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult2, {
@@ -1815,15 +1860,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(record2.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record2.GetSeqNo(), 2UL);
- // TODO: check continuation token
-#if 0
- UNIT_ASSERT_VALUES_EQUAL(readResult2.GetFirstUnprocessedQuery(), 0UL);
-
- UNIT_ASSERT(readResult2.HasLastProcessedKey());
- TOwnedCellVec lastKey2(
- TSerializedCellVec(readResult2.GetLastProcessedKey()).GetCells());
- CheckRow(lastKey2, {3, 3, 3});
-#endif
+ CheckContinuationToken(*readResult2, 0, {3, 3, 3});
auto readResult3 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult3, {
@@ -1836,11 +1873,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT_VALUES_EQUAL(record3.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record3.GetSeqNo(), 3UL);
- // TODO: check continuation token
-#if 0
- UNIT_ASSERT_VALUES_EQUAL(readResult3.GetFirstUnprocessedQuery(), 1UL);
- UNIT_ASSERT(!readResult3.HasLastProcessedKey());
-#endif
+ CheckContinuationToken(*readResult3, 0, {5, 5, 5});
auto readResult4 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult4, {
@@ -1852,7 +1885,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(!record4.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record4.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record4.GetSeqNo(), 4UL);
- // TODO: check continuation token
+
+ CheckContinuationToken(*readResult4, 1, {1, 1, 1});
auto readResult5 = helper.WaitReadResult();
CheckResult(helper.Tables["table-1"].UserTable, *readResult5, {
@@ -1863,7 +1897,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
UNIT_ASSERT(record5.HasFinished());
UNIT_ASSERT_VALUES_EQUAL(record5.GetReadId(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(record5.GetSeqNo(), 5UL);
- // TODO: check no continuation token
+
+ UNIT_ASSERT(!record5.HasContinuationToken());
}
Y_UNIT_TEST(ShouldReadRangeChunk1_100) {