diff options
| author | Vitaliy Filippov <[email protected]> | 2025-05-30 20:35:01 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-30 17:35:01 +0000 |
| commit | 2d478db57f419728634d2e69413ba70a2e4e1d79 (patch) | |
| tree | c63608fb96ff908d74356a964b1a285a1ce7b4a4 | |
| parent | a24d978b419f41c0217efae94e9bf3f690762563 (diff) | |
Fix a bug where KMeans clusters could change on schemeshard restart (#18979) (#19004)
5 files changed, 44 insertions, 39 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 152d4fe8054..96e461a275a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -1021,7 +1021,8 @@ private: return false; } - if (!buildInfo.Sample.Rows.empty()) { + if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample && + !buildInfo.Sample.Rows.empty()) { if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) { LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString()); SendUploadSampleKRequest(buildInfo); @@ -1033,7 +1034,8 @@ private: ClearDoneShards(txc, buildInfo); if (!buildInfo.Sample.Rows.empty()) { - if (buildInfo.KMeans.NextState()) { + if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Sample) { + buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Reshuffle; LOG_D("FillVectorIndex NextState " << buildInfo.DebugString()); PersistKMeansState(txc, buildInfo); Progress(BuildId); @@ -1603,29 +1605,6 @@ public: } NIceDb::TNiceDb db(txc.DB); - if (record.ProbabilitiesSize()) { - Y_ENSURE(record.RowsSize()); - auto& probabilities = record.GetProbabilities(); - auto& rows = *record.MutableRows(); - Y_ENSURE(probabilities.size() == rows.size()); - auto& sample = buildInfo.Sample.Rows; - auto from = sample.size(); - for (int i = 0; i != probabilities.size(); ++i) { - if (probabilities[i] >= buildInfo.Sample.MaxProbability) { - break; - } - sample.emplace_back(probabilities[i], std::move(rows[i])); - } - if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) { - from = 0; - } - for (; from < sample.size(); ++from) { - db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update( - NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P), - NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row) - ); - } - } TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()}; shardStatus.Processed += stats; @@ -1639,6 +1618,32 @@ public: switch (shardStatus.Status) { case NKikimrIndexBuilder::EBuildStatus::DONE: if (buildInfo.InProgressShards.erase(shardIdx)) { + if (record.ProbabilitiesSize()) { + Y_ENSURE(record.RowsSize()); + auto& probabilities = record.GetProbabilities(); + auto& rows = *record.MutableRows(); + Y_ENSURE(probabilities.size() == rows.size()); + auto& sample = buildInfo.Sample.Rows; + auto from = sample.size(); + for (int i = 0; i != probabilities.size(); ++i) { + if (probabilities[i] >= buildInfo.Sample.MaxProbability) { + break; + } + sample.emplace_back(probabilities[i], std::move(rows[i])); + } + if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) { + from = 0; + } + for (; from < sample.size(); ++from) { + db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update( + NIceDb::TUpdate<Schema::KMeansTreeSample::Probability>(sample[from].P), + NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row) + ); + } + for (; from < 2*buildInfo.KMeans.K; ++from) { + db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Delete(); + } + } buildInfo.DoneShards.emplace_back(shardIdx); } break; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 376d737c643..741bab942ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3207,17 +3207,6 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { bool NeedsAnotherParent() const noexcept { return Parent < ParentEnd(); } - bool NeedsAnotherState() const noexcept { - return State == Sample /*|| State == Recompute*/; - } - - bool NextState() noexcept { - if (!NeedsAnotherState()) { - return false; - } - State = static_cast<EState>(static_cast<ui32>(State) + 1); - return true; - } bool NextParent() noexcept { if (!NeedsAnotherParent()) { diff --git a/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp b/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp index 956bf14eec6..a0f59f9ab24 100644 --- a/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp +++ b/ydb/core/tx/schemeshard/ut_index/ut_vector_index.cpp @@ -6,7 +6,6 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h> #include <ydb/core/testlib/tablet_helpers.h> -#include <ydb/public/lib/deprecated/kicli/kicli.h> using namespace NKikimr; diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index f2d8530bd57..f9e1e7b3d05 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", buildIndexId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":0,"quantity":431,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-619-605-11328-10960","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":0,"quantity":433,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"109-72075186233409549-2-0-0-0-0-611-609-11032-11108","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); diff --git a/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp b/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp index 8db63022a1c..ef38844ef98 100644 --- a/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_vector_index_build_reboots/ut_vector_index_build_reboots.cpp @@ -26,12 +26,16 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) { KeyColumnNames: ["key"] SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 50 } } } } SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 150 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 250 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 350 } } } } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 0, 0, 50); WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 1, 50, 150); - WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 200); + WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 2, 150, 250); + WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 3, 250, 350); + WriteVectorTableRows(runtime, TTestTxConfig::SchemeShard, ++t.TxId, "/MyRoot/dir/Table", true, 4, 350, 400); } AsyncBuildVectorIndex(runtime, ++t.TxId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/dir/Table", "index1", "embedding", {"value"}); @@ -68,6 +72,14 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTestReboots) { {NLs::PathNotExist}); TestDescribeResult(DescribePath(runtime, indexPath + "/" + PostingTable + BuildSuffix1, true, true, true), {NLs::PathNotExist}); + + // Check row count in the posting table + { + auto rows = CountRows(runtime, TTestTxConfig::SchemeShard, "/MyRoot/dir/Table/index1/indexImplPostingTable"); + Cerr << "... posting table contains " << rows << " rows" << Endl; + UNIT_ASSERT_VALUES_EQUAL(rows, 400); + } + } }); } |
