summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaliy Filippov <[email protected]>2025-05-30 20:35:01 +0300
committerGitHub <[email protected]>2025-05-30 17:35:01 +0000
commit2d478db57f419728634d2e69413ba70a2e4e1d79 (patch)
treec63608fb96ff908d74356a964b1a285a1ce7b4a4
parenta24d978b419f41c0217efae94e9bf3f690762563 (diff)
Fix a bug where KMeans clusters could change on schemeshard restart (#18979) (#19004)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp55
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h11
-rw-r--r--ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp2
-rw-r--r--ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp14
5 files changed, 44 insertions, 39 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
index 152d4fe8054..96e461a275a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
@@ -1021,7 +1021,8 @@ private:
return false;
}
- if (!buildInfo.Sample.Rows.empty()) {
+ if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample &&
+ !buildInfo.Sample.Rows.empty()) {
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) {
LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString());
SendUploadSampleKRequest(buildInfo);
@@ -1033,7 +1034,8 @@ private:
ClearDoneShards(txc, buildInfo);
if (!buildInfo.Sample.Rows.empty()) {
- if (buildInfo.KMeans.NextState()) {
+ if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample) {
+ buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Reshuffle;
LOG_D("FillVectorIndex NextState " << buildInfo.DebugString());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
@@ -1603,29 +1605,6 @@ public:
}
NIceDb::TNiceDb db(txc.DB);
- if (record.ProbabilitiesSize()) {
- Y_ENSURE(record.RowsSize());
- auto& probabilities = record.GetProbabilities();
- auto& rows = *record.MutableRows();
- Y_ENSURE(probabilities.size() == rows.size());
- auto& sample = buildInfo.Sample.Rows;
- auto from = sample.size();
- for (int i = 0; i != probabilities.size(); ++i) {
- if (probabilities[i] >= buildInfo.Sample.MaxProbability) {
- break;
- }
- sample.emplace_back(probabilities[i], std::move(rows[i]));
- }
- if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) {
- from = 0;
- }
- for (; from < sample.size(); ++from) {
- db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update(
- NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P),
- NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row)
- );
- }
- }
TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()};
shardStatus.Processed += stats;
@@ -1639,6 +1618,32 @@ public:
switch (shardStatus.Status) {
case NKikimrIndexBuilder::EBuildStatus::DONE:
if (buildInfo.InProgressShards.erase(shardIdx)) {
+ if (record.ProbabilitiesSize()) {
+ Y_ENSURE(record.RowsSize());
+ auto& probabilities = record.GetProbabilities();
+ auto& rows = *record.MutableRows();
+ Y_ENSURE(probabilities.size() == rows.size());
+ auto& sample = buildInfo.Sample.Rows;
+ auto from = sample.size();
+ for (int i = 0; i != probabilities.size(); ++i) {
+ if (probabilities[i] >= buildInfo.Sample.MaxProbability) {
+ break;
+ }
+ sample.emplace_back(probabilities[i], std::move(rows[i]));
+ }
+ if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) {
+ from = 0;
+ }
+ for (; from < sample.size(); ++from) {
+ db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update(
+ NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P),
+ NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row)
+ );
+ }
+ for (; from < 2*buildInfo.KMeans.K; ++from) {
+ db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Delete();
+ }
+ }
buildInfo.DoneShards.emplace_back(shardIdx);
}
break;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 376d737c643..741bab942ce 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -3207,17 +3207,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
bool NeedsAnotherParent() const noexcept {
return Parent < ParentEnd();
}
- bool NeedsAnotherState() const noexcept {
- return State == Sample /*|| State == Recompute*/;
- }
-
- bool NextState() noexcept {
- if (!NeedsAnotherState()) {
- return false;
- }
- State = static_cast<EState>(static_cast<ui32>(State) + 1);
- return true;
- }
bool NextParent() noexcept {
if (!NeedsAnotherParent()) {
diff --git a/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp
index 956bf14eec6..a0f59f9ab24 100644
--- a/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp
+++ b/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp
@@ -6,7 +6,6 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
-#include <ydb/public/lib/deprecated/kicli/kicli.h>
using namespace NKikimr;
diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
index f2d8530bd57..f9e1e7b3d05 100644
--- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
+++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp
@@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", buildIndexId);
UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE);
- const TString meteringData = R"({"usage":{"start":0,"quantity":431,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-619-605-11328-10960","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
+ const TString meteringData = R"({"usage":{"start":0,"quantity":433,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-611-609-11032-11108","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n";
UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);
diff --git a/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp b/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp
index 8db63022a1c..ef38844ef98 100644
--- a/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp
@@ -26,12 +26,16 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) {
KeyColumnNames: ["key"]
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 50 } } } }
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 150 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 250 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 350 } } } }
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 0, 0, 50);
WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 1, 50, 150);
- WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 200);
+ WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 250);
+ WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 3, 250, 350);
+ WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 4, 350, 400);
}
AsyncBuildVectorIndex(runtime, ++t.TxId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/dir/Table", "index1", "embedding", {"value"});
@@ -68,6 +72,14 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) {
{NLs::PathNotExist});
TestDescribeResult(DescribePath(runtime, indexPath + "/" + PostingTable + BuildSuffix1, true, true, true),
{NLs::PathNotExist});
+
+ // Check row count in the posting table
+ {
+ auto rows = CountRows(runtime, TTestTxConfig::SchemeShard, "/MyRoot/dir/Table/index1/indexImplPostingTable");
+ Cerr << "... posting table contains " << rows << " rows" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(rows, 400);
+ }
+
}
});
}