aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorValery Mironov <mbkkt@ydb.tech>2025-03-22 16:00:32 +0300
committerGitHub <noreply@github.com>2025-03-22 16:00:32 +0300
commit79a448f766c99359d83b76fe737668df6621a4e3 (patch)
tree71a7599b05c052ab8058d5a9b5a2cd8637e2d7cb
parent064d1ba96afcfac0266f94c4e19d71a31130903a (diff)
downloadydb-79a448f766c99359d83b76fe737668df6621a4e3.tar.gz
Prefixed vector index construction: fill prefix table (#15877)
-rw-r--r--ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp89
-rw-r--r--ydb/core/tx/datashard/prefix_kmeans.cpp190
2 files changed, 188 insertions, 91 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp b/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp
index b163e3b7c6..440d69c9be 100644
--- a/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp
@@ -17,9 +17,9 @@ using namespace NTableIndex::NTableVectorKmeansTreeIndex;
static std::atomic<ui64> sId = 1;
static constexpr const char* kMainTable = "/Root/table-main";
+static constexpr const char* kPrefixTable = "/Root/table-prefix";
static constexpr const char* kLevelTable = "/Root/table-level";
static constexpr const char* kPostingTable = "/Root/table-posting";
-static constexpr const char* kPrefixTable = "/Root/table-prefix";
Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
static void DoBadRequest(Tests::TServer::TPtr server, TActorId sender,
@@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
}
}
- static std::tuple<TString, TString> DoPrefixKMeans(
+ static std::tuple<TString, TString, TString> DoPrefixKMeans(
Tests::TServer::TPtr server, TActorId sender, NTableIndex::TClusterId parent, ui64 seed, ui64 k,
NKikimrTxDataShard::TEvLocalKMeansRequest::EState upload, VectorIndexSettings::VectorType type,
VectorIndexSettings::Metric metric)
@@ -128,9 +128,9 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
rec.AddDataColumns("data");
rec.SetPrefixColumns(1);
+ rec.SetPrefixName(kPrefixTable);
rec.SetLevelName(kLevelTable);
rec.SetPostingName(kPostingTable);
- rec.SetPrefixName(kPrefixTable);
};
fill(ev1);
fill(ev2);
@@ -147,9 +147,10 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
issues.ToOneLineString());
}
+ auto prefix = ReadShardedTable(server, kPrefixTable);
auto level = ReadShardedTable(server, kLevelTable);
auto posting = ReadShardedTable(server, kPostingTable);
- return {std::move(level), std::move(posting)};
+ return {std::move(prefix), std::move(level), std::move(posting)};
}
static void DropTable(Tests::TServer::TPtr server, TActorId sender, const char* name)
@@ -158,6 +159,16 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
WaitTxNotification(server, sender, txId);
}
+ static void CreatePrefixTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options)
+ {
+ options.AllowSystemColumnNames(true);
+ options.Columns({
+ {"user", "String", true, true},
+ {IdColumn, NTableIndex::ClusterIdTypeName, true, true},
+ });
+ CreateShardedTable(server, sender, "/Root", "table-prefix", options);
+ }
+
static void CreateLevelTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options)
{
options.AllowSystemColumnNames(true);
@@ -330,11 +341,13 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
"(\"user-2\", 25, \"\x75\x75\3\", \"2-five\");");
auto create = [&] {
+ CreatePrefixTable(server, sender, options);
CreateLevelTable(server, sender, options);
CreatePostingTable(server, sender, options);
};
create();
auto recreate = [&] {
+ DropTable(server, sender, "table-prefix");
DropTable(server, sender, "table-level");
DropTable(server, sender, "table-posting");
create();
@@ -345,9 +358,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
seed = 0;
for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
- VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = mm\3\n"
"__ydb_parent = 40, __ydb_id = 42, __ydb_centroid = 11\3\n"
@@ -373,9 +391,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
seed = 111;
for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
- VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = 11\3\n"
"__ydb_parent = 40, __ydb_id = 42, __ydb_centroid = mm\3\n"
@@ -402,9 +425,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE,
VectorIndexSettings::DISTANCE_COSINE})
{
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
- VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = II\3\n"
@@ -464,11 +492,13 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
"(\"user-2\", 25, \"\x75\x75\3\", \"2-five\");");
auto create = [&] {
+ CreatePrefixTable(server, sender, options);
CreateLevelTable(server, sender, options);
CreateBuildTable(server, sender, options, "table-posting");
};
create();
auto recreate = [&] {
+ DropTable(server, sender, "table-prefix");
DropTable(server, sender, "table-level");
DropTable(server, sender, "table-posting");
create();
@@ -479,9 +509,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
seed = 0;
for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
- VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = mm\3\n"
"__ydb_parent = 40, __ydb_id = 42, __ydb_centroid = 11\3\n"
@@ -507,9 +542,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
seed = 111;
for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
- VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = 11\3\n"
"__ydb_parent = 40, __ydb_id = 42, __ydb_centroid = mm\3\n"
@@ -536,9 +576,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE,
VectorIndexSettings::DISTANCE_COSINE})
{
- auto [level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
- NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
- VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+ auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k,
+ NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD,
+ VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+ UNIT_ASSERT_VALUES_EQUAL(prefix,
+ "user = user-1, __ydb_id = 40\n"
+
+ "user = user-2, __ydb_id = 43\n"
+ );
UNIT_ASSERT_VALUES_EQUAL(level,
"__ydb_parent = 40, __ydb_id = 41, __ydb_centroid = II\3\n"
diff --git a/ydb/core/tx/datashard/prefix_kmeans.cpp b/ydb/core/tx/datashard/prefix_kmeans.cpp
index 053ca5ed73..49a9da4c5d 100644
--- a/ydb/core/tx/datashard/prefix_kmeans.cpp
+++ b/ydb/core/tx/datashard/prefix_kmeans.cpp
@@ -67,18 +67,19 @@ protected:
std::vector<ui64> ClusterSizes;
// Upload
- std::shared_ptr<NTxProxy::TUploadTypes> InitTargetTypes;
- std::shared_ptr<NTxProxy::TUploadTypes> InitNextTypes;
+ std::shared_ptr<NTxProxy::TUploadTypes> LevelTypes;
+ std::shared_ptr<NTxProxy::TUploadTypes> PostingTypes;
+ std::shared_ptr<NTxProxy::TUploadTypes> PrefixTypes;
+ std::shared_ptr<NTxProxy::TUploadTypes> UploadTypes;
- std::shared_ptr<NTxProxy::TUploadTypes> TargetTypes;
- std::shared_ptr<NTxProxy::TUploadTypes> NextTypes;
+ const TString LevelTable;
+ const TString PostingTable;
+ const TString PrefixTable;
+ TString UploadTable;
- const TString TargetTable;
- const TString NextTable;
- TString CurrTable;
-
- TBufferData ReadBuf;
- TBufferData WriteBuf;
+ TBufferData PostingBuf;
+ TBufferData PrefixBuf;
+ TBufferData UploadBuf;
NTable::TPos EmbeddingPos = 0;
NTable::TPos DataPos = 1;
@@ -100,7 +101,7 @@ protected:
TActorId ResponseActorId;
TAutoPtr<TEvDataShard::TEvPrefixKMeansResponse> Response;
- ui32 PrefixColulmns;
+ ui32 PrefixColumns;
TSerializedCellVec Key;
bool HasNextKey = false;
@@ -125,30 +126,50 @@ public:
, Lead{std::move(lead)}
, BuildId{request.GetId()}
, Rng{request.GetSeed()}
- , TargetTable{request.GetLevelName()}
- , NextTable{request.GetPostingName()}
+ , LevelTable{request.GetLevelName()}
+ , PostingTable{request.GetPostingName()}
+ , PrefixTable{request.GetPrefixName()}
, ResponseActorId{responseActorId}
, Response{std::move(response)}
- , PrefixColulmns{request.GetPrefixColumns()}
+ , PrefixColumns{request.GetPrefixColumns()}
{
const auto& embedding = request.GetEmbeddingColumn();
const auto& data = request.GetDataColumns();
// scan tags
UploadScan = MakeUploadTags(table, embedding, data, EmbeddingPos, DataPos, KMeansScan);
// upload types
- if (Ydb::Type type; State <= EState::KMEANS) {
- TargetTypes = std::make_shared<NTxProxy::TUploadTypes>(3);
+ {
+ Ydb::Type type;
+ LevelTypes = std::make_shared<NTxProxy::TUploadTypes>(3);
type.set_type_id(NTableIndex::ClusterIdType);
- (*TargetTypes)[0] = {NTableIndex::NTableVectorKmeansTreeIndex::ParentColumn, type};
- (*TargetTypes)[1] = {NTableIndex::NTableVectorKmeansTreeIndex::IdColumn, type};
+ (*LevelTypes)[0] = {NTableIndex::NTableVectorKmeansTreeIndex::ParentColumn, type};
+ (*LevelTypes)[1] = {NTableIndex::NTableVectorKmeansTreeIndex::IdColumn, type};
type.set_type_id(Ydb::Type::STRING);
- (*TargetTypes)[2] = {NTableIndex::NTableVectorKmeansTreeIndex::CentroidColumn, type};
+ (*LevelTypes)[2] = {NTableIndex::NTableVectorKmeansTreeIndex::CentroidColumn, type};
}
- NextTypes = MakeUploadTypes(table, UploadState, embedding, data, PrefixColulmns);
+ PostingTypes = MakeUploadTypes(table, UploadState, embedding, data, PrefixColumns);
+ {
+ auto types = GetAllTypes(table);
+
+ PrefixTypes = std::make_shared<NTxProxy::TUploadTypes>();
+ PrefixTypes->reserve(1 + PrefixColumns);
- InitTargetTypes = TargetTypes;
- InitNextTypes = NextTypes;
- CurrTable = TargetTable;
+ Ydb::Type type;
+ type.set_type_id(NTableIndex::ClusterIdType);
+ PrefixTypes->emplace_back(NTableIndex::NTableVectorKmeansTreeIndex::IdColumn, type);
+
+ auto addType = [&](const auto& column) {
+ auto it = types.find(column);
+ if (it != types.end()) {
+ NScheme::ProtoFromTypeInfo(it->second, type);
+ PrefixTypes->emplace_back(it->first, type);
+ types.erase(it);
+ }
+ };
+ for (const auto& column : table.KeyColumnIds | std::views::take(PrefixColumns)) {
+ addType(table.Columns.at(column).Name);
+ }
+ }
}
TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr<TScheme>) noexcept final
@@ -197,18 +218,25 @@ public:
TString Debug() const
{
return TStringBuilder() << " TPrefixKMeansScan Id: " << BuildId << " Parent: " << Parent << " Child: " << Child
- << " CurrTable: " << CurrTable << " K: " << K << " Clusters: " << Clusters.size()
+ << " UploadTable: " << UploadTable << " K: " << K << " Clusters: " << Clusters.size()
<< " State: " << State << " Round: " << Round << " / " << MaxRounds
- << " ReadBuf size: " << ReadBuf.Size() << " WriteBuf size: " << WriteBuf.Size() << " ";
+ << " PostingBuf size: " << PostingBuf.Size() << " PrefixBuf size: " << PrefixBuf.Size() << " UploadBuf size: " << UploadBuf.Size() << " ";
}
EScan PageFault() noexcept final
{
LOG_T("PageFault " << Debug());
- if (!ReadBuf.IsEmpty() && WriteBuf.IsEmpty()) {
- ReadBuf.FlushTo(WriteBuf);
- Upload(false);
+ if (!UploadBuf.IsEmpty()) {
+ return EScan::Feed;
+ }
+
+ if (!PostingBuf.IsEmpty()) {
+ PostingBuf.FlushTo(UploadBuf);
+ InitUpload(PostingTable, PostingTypes);
+ } else if (!PrefixBuf.IsEmpty()) {
+ PrefixBuf.FlushTo(UploadBuf);
+ InitUpload(PrefixTable, PrefixTypes);
}
return EScan::Feed;
@@ -230,8 +258,8 @@ protected:
{
LOG_T("Retry upload " << Debug());
- if (!WriteBuf.IsEmpty()) {
- Upload(true);
+ if (!UploadBuf.IsEmpty()) {
+ RetryUpload();
}
}
@@ -251,12 +279,15 @@ protected:
UploadStatus.StatusCode = ev->Get()->Status;
UploadStatus.Issues = ev->Get()->Issues;
if (UploadStatus.IsSuccess()) {
- UploadRows += WriteBuf.GetRows();
- UploadBytes += WriteBuf.GetBytes();
- WriteBuf.Clear();
- if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
- ReadBuf.FlushTo(WriteBuf);
- Upload(false);
+ UploadRows += UploadBuf.GetRows();
+ UploadBytes += UploadBuf.GetBytes();
+ UploadBuf.Clear();
+ if (PostingBuf.IsReachLimits(Limits)) {
+ PostingBuf.FlushTo(UploadBuf);
+ InitUpload(PostingTable, PostingTypes);
+ } else if (PrefixBuf.IsReachLimits(Limits)) {
+ PrefixBuf.FlushTo(UploadBuf);
+ InitUpload(PrefixTable, PrefixTypes);
}
Driver->Touch(EScan::Feed);
@@ -277,14 +308,20 @@ protected:
EScan FeedUpload()
{
- if (!ReadBuf.IsReachLimits(Limits)) {
+ if (!PostingBuf.IsReachLimits(Limits) && !PrefixBuf.IsReachLimits(Limits)) {
return EScan::Feed;
}
- if (!WriteBuf.IsEmpty()) {
+ if (!UploadBuf.IsEmpty()) {
return EScan::Sleep;
}
- ReadBuf.FlushTo(WriteBuf);
- Upload(false);
+ if (PostingBuf.IsReachLimits(Limits)) {
+ PostingBuf.FlushTo(UploadBuf);
+ InitUpload(PostingTable, PostingTypes);
+ } else {
+ Y_ASSERT(PrefixBuf.IsReachLimits(Limits));
+ PrefixBuf.FlushTo(UploadBuf);
+ InitUpload(PrefixTable, PrefixTypes);
+ }
return EScan::Feed;
}
@@ -293,39 +330,44 @@ protected:
return Rng.GenRand64();
}
- void Upload(bool isRetry)
+ void UploadImpl()
{
- if (isRetry) {
- ++RetryCount;
- } else {
- RetryCount = 0;
- if (State != EState::KMEANS && NextTypes) {
- TargetTypes = std::exchange(NextTypes, {});
- CurrTable = NextTable;
- }
- }
-
+ Y_ASSERT(!UploadBuf.IsEmpty());
auto actor = NTxProxy::CreateUploadRowsInternal(
- this->SelfId(), CurrTable, TargetTypes, WriteBuf.GetRowsData(),
+ this->SelfId(), UploadTable, UploadTypes, UploadBuf.GetRowsData(),
NTxProxy::EUploadRowsMode::WriteToTableShadow, true /*writeToPrivateTable*/);
Uploader = this->Register(actor);
}
+ void InitUpload(std::string_view table, std::shared_ptr<NTxProxy::TUploadTypes> types)
+ {
+ RetryCount = 0;
+ UploadTable = table;
+ UploadTypes = std::move(types);
+ UploadImpl();
+ }
+
+ void RetryUpload()
+ {
+ ++RetryCount;
+ UploadImpl();
+ }
+
+
void UploadSample()
{
- Y_ASSERT(ReadBuf.IsEmpty());
- Y_ASSERT(WriteBuf.IsEmpty());
+ Y_ASSERT(UploadBuf.IsEmpty());
std::array<TCell, 2> pk;
std::array<TCell, 1> data;
for (NTable::TPos pos = 0; const auto& row : Clusters) {
pk[0] = TCell::Make(Parent);
pk[1] = TCell::Make(Child + pos);
data[0] = TCell{row};
- WriteBuf.AddRow({}, TSerializedCellVec{pk}, TSerializedCellVec::Serialize(data));
+ UploadBuf.AddRow({}, TSerializedCellVec{pk}, TSerializedCellVec::Serialize(data));
++pos;
}
- Upload(false);
+ InitUpload(LevelTable, LevelTypes);
}
};
@@ -356,15 +398,12 @@ class TPrefixKMeansScan final: public TPrefixKMeansScanBase, private TCalculatio
// TODO(mbkkt) Upper or Lower doesn't matter here, because we seek to (prefix, inf)
// so we can choose Lower if it's faster.
// Exact seek with Lower also possible but needs to rewrite some code in Feed
- Lead.To(Key.GetCells().subspan(0, PrefixColulmns), NTable::ESeek::Upper);
+ Lead.To(Key.GetCells().subspan(0, PrefixColumns), NTable::ESeek::Upper);
Key = {};
MaxProbability = std::numeric_limits<ui64>::max();
MaxRows.clear();
Clusters.clear();
ClusterSizes.clear();
- TargetTypes = InitTargetTypes;
- NextTypes = InitNextTypes;
- CurrTable = TargetTable;
HasNextKey = false;
AggregatedClusters.clear();
return true;
@@ -385,18 +424,25 @@ public:
ui64 zeroSeq = 0;
while (true) {
if (State == UploadState) {
- if (!WriteBuf.IsEmpty()) {
- return EScan::Sleep;
- }
- if (!ReadBuf.IsEmpty()) {
- ReadBuf.FlushTo(WriteBuf);
- Upload(false);
+ // TODO: it's a little suboptimal to wait here
+ // better is wait after MoveToNextKey but before UploadSample
+ if (!UploadBuf.IsEmpty()) {
return EScan::Sleep;
}
if (MoveToNextKey()) {
zeroSeq = seq;
continue;
}
+ if (!PostingBuf.IsEmpty()) {
+ PostingBuf.FlushTo(UploadBuf);
+ InitUpload(PostingTable, PostingTypes);
+ return EScan::Sleep;
+ }
+ if (!PrefixBuf.IsEmpty()) {
+ PrefixBuf.FlushTo(UploadBuf);
+ InitUpload(PrefixTable, PrefixTypes);
+ return EScan::Sleep;
+ }
return EScan::Final;
}
@@ -439,7 +485,13 @@ public:
LOG_T("Feed " << Debug());
if (!Key) {
Key = TSerializedCellVec{key};
- } else if (!TCellVectorsEquals{}(Key.GetCells().subspan(0, PrefixColulmns), key.subspan(0, PrefixColulmns))) {
+
+ auto pk = TSerializedCellVec::Serialize(Key.GetCells().subspan(0, PrefixColumns));
+ std::array<TCell, 1> cells;
+ cells[0] = TCell::Make(Parent);
+ TSerializedCellVec::UnsafeAppendCells(cells, pk);
+ PrefixBuf.AddRow({}, TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize({}));
+ } else if (!TCellVectorsEquals{}(Key.GetCells().subspan(0, PrefixColumns), key.subspan(0, PrefixColumns))) {
HasNextKey = true;
return EScan::Reset;
}
@@ -581,7 +633,7 @@ private:
if (pos > K) {
return EScan::Feed;
}
- AddRowBuild2Build(ReadBuf, Child + pos, key, row, PrefixColulmns);
+ AddRowBuild2Build(PostingBuf, Child + pos, key, row, PrefixColumns);
return FeedUpload();
}
@@ -591,7 +643,7 @@ private:
if (pos > K) {
return EScan::Feed;
}
- AddRowBuild2Posting(ReadBuf, Child + pos, key, row, DataPos, PrefixColulmns);
+ AddRowBuild2Posting(PostingBuf, Child + pos, key, row, DataPos, PrefixColumns);
return FeedUpload();
}
};