diff options
author | kungurtsev <kungasc@ydb.tech> | 2025-04-02 13:10:50 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-02 13:10:50 +0200 |
commit | 5616b30ecd97a43eca0cc4f56e783d026ea8458d (patch) | |
tree | 0dd7ddc5719a3b939d121e8724a1326a7722f358 | |
parent | f32156403298934a0694705b2576701edea8046d (diff) | |
download | ydb-5616b30ecd97a43eca0cc4f56e783d026ea8458d.tar.gz |
Extract index scan settings class (#16616)
18 files changed, 144 insertions, 144 deletions
diff --git a/ydb/core/protos/index_builder.proto b/ydb/core/protos/index_builder.proto index 12beb69e56d..e0acece5474 100644 --- a/ydb/core/protos/index_builder.proto +++ b/ydb/core/protos/index_builder.proto @@ -21,6 +21,12 @@ message TColumnBuildSettings { optional string Table = 2; } +message TIndexBuildScanSettings { + optional uint32 MaxBatchRows = 1 [ default = 50000 ]; + optional uint64 MaxBatchBytes = 2 [ default = 8388608 ]; + optional uint32 MaxBatchRetries = 3 [ default = 50 ]; +} + message TIndexBuildSettings { optional string source_path = 1; optional Ydb.Table.TableIndex index = 2; @@ -28,12 +34,15 @@ message TIndexBuildSettings { optional bool pg_mode = 8 [ default = false]; optional bool if_not_exist = 9 [ default = false]; - optional uint32 max_batch_rows = 3 [ default = 50000 ]; - optional uint64 max_batch_bytes = 4 [ default = 8388608 ]; + reserved 3; // max_batch_rows + reserved 4; // max_batch_bytes + optional uint32 max_shards_in_flight = 5 [ default = 32 ]; - optional uint32 max_retries_upload_batch = 6 [ default = 50 ]; + reserved 6; // max_retries_upload_batch optional google.protobuf.Any AlterMainTablePayload = 10; + + optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 11; } message TIndexBuild { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index e8a1a16c189..ef87f4859d9 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1436,17 +1436,18 @@ message TEvBuildIndexCreateRequest { optional uint64 SnapshotTxId = 8; optional uint64 SnapshotStep = 9; - optional uint32 MaxBatchRows = 10; - optional uint64 MaxBatchBytes = 11; + reserved 10; // MaxBatchRows + reserved 11; // MaxBatchBytes optional uint64 SeqNoGeneration = 12; // monotonically increasing sequence, first part optional uint64 SeqNoRound = 13; // monotonically increasing sequence, second part - - optional uint64 MaxRetries = 14; + reserved 14; // MaxRetries repeated string DataColumns = 15; optional NKikimrIndexBuilder.TColumnBuildSettings ColumnBuildSettings = 16; + + optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 17; } message TEvBuildIndexProgressResponse { @@ -1558,6 +1559,8 @@ message TEvLocalKMeansRequest { optional string EmbeddingColumn = 19; repeated string DataColumns = 20; + + optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 22; } message TEvLocalKMeansResponse { @@ -1609,6 +1612,8 @@ message TEvReshuffleKMeansRequest { optional string EmbeddingColumn = 14; repeated string DataColumns = 15; + + optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 16; } message TEvReshuffleKMeansResponse { @@ -1660,6 +1665,8 @@ message TEvPrefixKMeansRequest { optional string EmbeddingColumn = 15; repeated string DataColumns = 16; optional uint32 PrefixColumns = 17; + + optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 18; } message TEvPrefixKMeansResponse { diff --git a/ydb/core/tx/datashard/buffer_data.h b/ydb/core/tx/datashard/buffer_data.h index f254fe0e9f7..d224ccd3c88 100644 --- a/ydb/core/tx/datashard/buffer_data.h +++ b/ydb/core/tx/datashard/buffer_data.h @@ -52,9 +52,8 @@ public: return Rows->size(); } - bool IsReachLimits(const TUploadLimits& Limits) { - // TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit] - return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit; + bool HasReachedLimits(size_t rowsLimit, ui64 bytesLimit) const { + return Rows->size() > rowsLimit || ByteSize > bytesLimit; } auto&& ExtractLastKey() { diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index 9a49d965de9..82fd0880bfb 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -103,7 +103,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable using TBase = TActor<TThis>; protected: - const TUploadLimits Limits; + const TIndexBuildScanSettings ScanSettings; const ui64 BuildIndexId; const TString TargetTable; @@ -129,7 +129,7 @@ protected: TSerializedCellVec LastUploadedKey; TActorId Uploader; - ui64 RetryCount = 0; + ui32 RetryCount = 0; TUploadMonStats Stats = TUploadMonStats("tablets", "build_index_upload"); TUploadStatus UploadStatus; @@ -141,9 +141,9 @@ protected: const TActorId& progressActorId, const TSerializedTableRange& range, const TUserTable& tableInfo, - TUploadLimits limits) + const TIndexBuildScanSettings& scanSettings) : TBase(&TThis::StateWork) - , Limits(limits) + , ScanSettings(scanSettings) , BuildIndexId(buildIndexId) , TargetTable(target) , SeqNo(seqNo) @@ -161,7 +161,7 @@ protected: addRow(); - if (!ReadBuf.IsReachLimits(Limits)) { + if (!HasReachedLimits(ReadBuf, ScanSettings)) { return EScan::Feed; } @@ -354,7 +354,7 @@ private: this->Send(ProgressActorId, progress.Release()); - if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { + if (HasReachedLimits(ReadBuf, ScanSettings)) { ReadBuf.FlushTo(WriteBuf); Upload(); } @@ -363,10 +363,10 @@ private: return; } - if (RetryCount < Limits.MaxUploadRowsRetryCount && UploadStatus.IsRetriable()) { + if (RetryCount < ScanSettings.GetMaxBatchRetries() && UploadStatus.IsRetriable()) { LOG_N("Got retriable error, " << Debug()); - ctx.Schedule(Limits.GetTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); + ctx.Schedule(GetRetryWakeupTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); return; } @@ -412,8 +412,8 @@ public: TProtoColumnsCRef targetIndexColumns, TProtoColumnsCRef targetDataColumns, const TUserTable& tableInfo, - TUploadLimits limits) - : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, limits) + const TIndexBuildScanSettings& scanSettings) + : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, scanSettings) , TargetDataColumnPos(targetIndexColumns.size()) { ScanTags = BuildTags(tableInfo, targetIndexColumns, targetDataColumns); UploadColumnsTypes = BuildTypes(tableInfo, targetIndexColumns, targetDataColumns); @@ -444,8 +444,8 @@ public: const TSerializedTableRange& range, const NKikimrIndexBuilder::TColumnBuildSettings& columnBuildSettings, const TUserTable& tableInfo, - TUploadLimits limits) - : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, limits) { + const TIndexBuildScanSettings& scanSettings) + : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, scanSettings) { Y_ENSURE(columnBuildSettings.columnSize() > 0); UploadColumnsTypes = BuildTypes(tableInfo, columnBuildSettings); UploadMode = NTxProxy::EUploadRowsMode::UpsertIfExists; @@ -481,13 +481,13 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan( TProtoColumnsCRef targetDataColumns, const NKikimrIndexBuilder::TColumnBuildSettings& columnsToBuild, const TUserTable& tableInfo, - TUploadLimits limits) { + const TIndexBuildScanSettings& scanSettings) { if (columnsToBuild.columnSize() > 0) { return new TBuildColumnsScan( - buildIndexId, target, seqNo, dataShardId, progressActorId, range, columnsToBuild, tableInfo, limits); + buildIndexId, target, seqNo, dataShardId, progressActorId, range, columnsToBuild, tableInfo, scanSettings); } return new TBuildIndexScan( - buildIndexId, target, seqNo, dataShardId, progressActorId, range, targetIndexColumns, targetDataColumns, tableInfo, limits); + buildIndexId, target, seqNo, dataShardId, progressActorId, range, targetIndexColumns, targetDataColumns, tableInfo, scanSettings); } class TDataShard::TTxHandleSafeBuildIndexScan: public NTabletFlatExecutor::TTransactionBase<TDataShard> { @@ -608,17 +608,6 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, scanOpts.SetSnapshotRowVersion(rowVersion); scanOpts.SetResourceBroker("build_index", 10); - TUploadLimits limits; - if (record.HasMaxBatchRows()) { - limits.BatchRowsLimit = record.GetMaxBatchRows(); - } - if (record.HasMaxBatchBytes()) { - limits.BatchBytesLimit = record.GetMaxBatchBytes(); - } - if (record.HasMaxRetries()) { - limits.MaxUploadRowsRetryCount = record.GetMaxRetries(); - } - const auto scanId = QueueScan(userTable.LocalTid, CreateBuildIndexScan(buildIndexId, record.GetTargetName(), @@ -630,7 +619,7 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, record.GetDataColumns(), record.GetColumnBuildSettings(), userTable, - limits), + record.GetScanSettings()), 0, scanOpts); diff --git a/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp b/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp index a7d8100be1e..30664b9b746 100644 --- a/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp +++ b/ydb/core/tx/datashard/datashard_ut_prefix_kmeans.cpp @@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { static std::tuple<TString, TString, TString> DoPrefixKMeans( Tests::TServer::TPtr server, TActorId sender, NTableIndex::TClusterId parent, ui64 seed, ui64 k, NKikimrTxDataShard::TEvLocalKMeansRequest::EState upload, VectorIndexSettings::VectorType type, - VectorIndexSettings::Metric metric) + VectorIndexSettings::Metric metric, ui32 maxBatchRows) { auto id = sId.fetch_add(1, std::memory_order_relaxed); auto& runtime = *server->GetRuntime(); @@ -131,6 +131,8 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { rec.SetPrefixName(kPrefixTable); rec.SetLevelName(kLevelTable); rec.SetPostingName(kPostingTable); + + rec.MutableScanSettings()->SetMaxBatchRows(maxBatchRows); }; fill(ev1); fill(ev2); @@ -359,9 +361,10 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { seed = 0; for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING, - VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + VectorIndexSettings::VECTOR_TYPE_UINT8, distance, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -388,13 +391,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 45, key = 25, data = 2-five\n" ); recreate(); - } + }} seed = 111; for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING, - VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + VectorIndexSettings::VECTOR_TYPE_UINT8, distance, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -421,14 +425,13 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 45, key = 25, data = 2-five\n" ); recreate(); - } + }} seed = 32; - for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE, - VectorIndexSettings::DISTANCE_COSINE}) - { + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE, VectorIndexSettings::DISTANCE_COSINE}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_POSTING, - VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + VectorIndexSettings::VECTOR_TYPE_UINT8, similarity, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -453,7 +456,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 44, key = 25, data = 2-five\n" ); recreate(); - } + }} } Y_UNIT_TEST (BuildToBuild) { @@ -511,9 +514,10 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { seed = 0; for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD, - VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + VectorIndexSettings::VECTOR_TYPE_UINT8, distance, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -540,13 +544,14 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 45, key = 25, embedding = \x75\x75\3, data = 2-five\n" ); recreate(); - } + }} seed = 111; for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD, - VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + VectorIndexSettings::VECTOR_TYPE_UINT8, distance, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -573,14 +578,13 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 45, key = 25, embedding = \x75\x75\3, data = 2-five\n" ); recreate(); - } + }} seed = 32; - for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE, - VectorIndexSettings::DISTANCE_COSINE}) - { + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE, VectorIndexSettings::DISTANCE_COSINE}) { + for (ui32 maxBatchRows : {0, 1, 4, 5, 6, 50000}) { auto [prefix, level, posting] = DoPrefixKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_BUILD_TO_BUILD, - VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + VectorIndexSettings::VECTOR_TYPE_UINT8, similarity, maxBatchRows); UNIT_ASSERT_VALUES_EQUAL(prefix, "user = user-1, __ydb_id = 40\n" @@ -605,7 +609,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) { "__ydb_parent = 44, key = 25, embedding = \x75\x75\3, data = 2-five\n" ); recreate(); - } + }} } } diff --git a/ydb/core/tx/datashard/local_kmeans.cpp b/ydb/core/tx/datashard/local_kmeans.cpp index 6e867e58f8e..6f00de2f121 100644 --- a/ydb/core/tx/datashard/local_kmeans.cpp +++ b/ydb/core/tx/datashard/local_kmeans.cpp @@ -49,7 +49,7 @@ public: Size = 0; lock.unlock(); - LOG_N("FinishTLocalKMeansScan " << Response->Record.ShortDebugString()); + LOG_N("Finish TLocalKMeansScan " << Response->Record.ShortDebugString()); ctx.Send(ResponseActorId, std::move(Response)); } @@ -139,7 +139,7 @@ protected: ui32 RetryCount = 0; TActorId Uploader; - TUploadLimits Limits; + const TIndexBuildScanSettings ScanSettings; NTable::TTag KMeansScan; TTags UploadScan; @@ -172,6 +172,7 @@ public: , Rng{request.GetSeed()} , TargetTable{request.GetLevelName()} , NextTable{request.GetPostingName()} + , ScanSettings(request.GetScanSettings()) , Result{std::move(result)} { const auto& embedding = request.GetEmbeddingColumn(); @@ -290,7 +291,7 @@ protected: UploadRows += WriteBuf.GetRows(); UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); - if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { + if (HasReachedLimits(ReadBuf, ScanSettings)) { ReadBuf.FlushTo(WriteBuf); Upload(false); } @@ -299,10 +300,10 @@ protected: return; } - if (RetryCount < Limits.MaxUploadRowsRetryCount && UploadStatus.IsRetriable()) { + if (RetryCount < ScanSettings.GetMaxBatchRetries() && UploadStatus.IsRetriable()) { LOG_N("Got retriable error, " << Debug() << UploadStatus.ToString()); - ctx.Schedule(Limits.GetTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); + ctx.Schedule(GetRetryWakeupTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); return; } @@ -313,7 +314,7 @@ protected: EScan FeedUpload() { - if (!ReadBuf.IsReachLimits(Limits)) { + if (!HasReachedLimits(ReadBuf, ScanSettings)) { return EScan::Feed; } if (!WriteBuf.IsEmpty()) { diff --git a/ydb/core/tx/datashard/prefix_kmeans.cpp b/ydb/core/tx/datashard/prefix_kmeans.cpp index f9db2e27066..4f93468fe3d 100644 --- a/ydb/core/tx/datashard/prefix_kmeans.cpp +++ b/ydb/core/tx/datashard/prefix_kmeans.cpp @@ -87,7 +87,7 @@ protected: ui32 RetryCount = 0; TActorId Uploader; - TUploadLimits Limits; + const TIndexBuildScanSettings ScanSettings; NTable::TTag EmbeddingTag; TTags ScanTags; @@ -132,6 +132,7 @@ public: , LevelTable{request.GetLevelName()} , PostingTable{request.GetPostingName()} , PrefixTable{request.GetPrefixName()} + , ScanSettings(request.GetScanSettings()) , ResponseActorId{responseActorId} , Response{std::move(response)} , PrefixColumns{request.GetPrefixColumns()} @@ -206,7 +207,7 @@ public: } NYql::IssuesToMessage(UploadStatus.Issues, record.MutableIssues()); - LOG_N("Finish" << Debug() << " " << Response->Record.ShortDebugString()); + LOG_N("Finish " << Debug() << " " << Response->Record.ShortDebugString()); Send(ResponseActorId, Response.Release()); Driver = nullptr; @@ -286,10 +287,10 @@ protected: return; } - if (RetryCount < Limits.MaxUploadRowsRetryCount && UploadStatus.IsRetriable()) { + if (RetryCount < ScanSettings.GetMaxBatchRetries() && UploadStatus.IsRetriable()) { LOG_N("Got retriable error, " << Debug() << " " << UploadStatus.ToString()); - ctx.Schedule(Limits.GetTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); + ctx.Schedule(GetRetryWakeupTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); return; } @@ -305,7 +306,7 @@ protected: bool ShouldWaitUpload() { - if (!LevelBuf.IsReachLimits(Limits) && !PostingBuf.IsReachLimits(Limits) && !PrefixBuf.IsReachLimits(Limits)) { + if (!HasReachedLimits(LevelBuf, ScanSettings) && !HasReachedLimits(PostingBuf, ScanSettings) && !HasReachedLimits(PrefixBuf, ScanSettings)) { return false; } @@ -317,7 +318,7 @@ protected: || TryUpload(PostingBuf, PostingTable, PostingTypes, true) || TryUpload(PrefixBuf, PrefixTable, PrefixTypes, true); - return !LevelBuf.IsReachLimits(Limits) && !PostingBuf.IsReachLimits(Limits) && !PrefixBuf.IsReachLimits(Limits); + return !HasReachedLimits(LevelBuf, ScanSettings) && !HasReachedLimits(PostingBuf, ScanSettings) && !HasReachedLimits(PrefixBuf, ScanSettings); } void UploadImpl() @@ -359,7 +360,7 @@ protected: return true; } - if (!buffer.IsEmpty() && (!byLimit || buffer.IsReachLimits(Limits))) { + if (!buffer.IsEmpty() && (!byLimit || HasReachedLimits(buffer, ScanSettings))) { buffer.FlushTo(UploadBuf); InitUpload(table, types); return true; @@ -471,7 +472,7 @@ public: if (IsFirstPrefixFeed && IsPrefixRowsValid) { PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row)); - if (PrefixRows.IsReachLimits(Limits)) { + if (HasReachedLimits(PrefixRows, ScanSettings)) { PrefixRows.Clear(); IsPrefixRowsValid = false; } diff --git a/ydb/core/tx/datashard/reshuffle_kmeans.cpp b/ydb/core/tx/datashard/reshuffle_kmeans.cpp index 0b61f672893..88071b5f333 100644 --- a/ydb/core/tx/datashard/reshuffle_kmeans.cpp +++ b/ydb/core/tx/datashard/reshuffle_kmeans.cpp @@ -59,7 +59,7 @@ protected: ui32 RetryCount = 0; TActorId Uploader; - TUploadLimits Limits; + const TIndexBuildScanSettings ScanSettings; TTags UploadScan; @@ -91,6 +91,7 @@ public: , BuildId{request.GetId()} , Clusters{request.GetClusters().begin(), request.GetClusters().end()} , TargetTable{request.GetPostingName()} + , ScanSettings(request.GetScanSettings()) , ResponseActorId{responseActorId} , Response{std::move(response)} { @@ -155,7 +156,7 @@ public: } NYql::IssuesToMessage(UploadStatus.Issues, record.MutableIssues()); - LOG_N("Finish" << Debug() << " " << Response->Record.ShortDebugString()); + LOG_N("Finish " << Debug() << " " << Response->Record.ShortDebugString()); Send(ResponseActorId, Response.Release()); Driver = nullptr; @@ -170,9 +171,9 @@ public: TString Debug() const { - return TStringBuilder() << " TReshuffleKMeansScan Id: " << BuildId << " Parent: " << Parent << " Child: " << Child + return TStringBuilder() << "TReshuffleKMeansScan Id: " << BuildId << " Parent: " << Parent << " Child: " << Child << " Target: " << TargetTable << " K: " << K << " Clusters: " << Clusters.size() - << " ReadBuf size: " << ReadBuf.Size() << " WriteBuf size: " << WriteBuf.Size() << " "; + << " ReadBuf size: " << ReadBuf.Size() << " WriteBuf size: " << WriteBuf.Size(); } EScan PageFault() final @@ -208,8 +209,9 @@ protected: << " ev->Sender: " << ev->Sender.ToString()); if (Uploader) { - Y_ENSURE(Uploader == ev->Sender, "Mismatch Uploader: " << Uploader.ToString() << " ev->Sender: " - << ev->Sender.ToString() << Debug()); + Y_ENSURE(Uploader == ev->Sender, "Mismatch" + << " Uploader: " << Uploader.ToString() + << " Sender: " << ev->Sender.ToString()); } else { Y_ENSURE(Driver == nullptr); return; @@ -221,7 +223,7 @@ protected: UploadRows += WriteBuf.GetRows(); UploadBytes += WriteBuf.GetBytes(); WriteBuf.Clear(); - if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) { + if (HasReachedLimits(ReadBuf, ScanSettings)) { ReadBuf.FlushTo(WriteBuf); Upload(false); } @@ -230,21 +232,21 @@ protected: return; } - if (RetryCount < Limits.MaxUploadRowsRetryCount && UploadStatus.IsRetriable()) { - LOG_N("Got retriable error, " << Debug() << UploadStatus.ToString()); + if (RetryCount < ScanSettings.GetMaxBatchRetries() && UploadStatus.IsRetriable()) { + LOG_N("Got retriable error, " << Debug() << " " << UploadStatus.ToString()); - Schedule(Limits.GetTimeoutBackoff(RetryCount), new TEvents::TEvWakeup); + Schedule(GetRetryWakeupTimeoutBackoff(RetryCount), new TEvents::TEvWakeup); return; } - LOG_N("Got error, abort scan, " << Debug() << UploadStatus.ToString()); + LOG_N("Got error, abort scan, " << Debug() << " " << UploadStatus.ToString()); Driver->Touch(EScan::Final); } EScan FeedUpload() { - if (!ReadBuf.IsReachLimits(Limits)) { + if (!HasReachedLimits(ReadBuf, ScanSettings)) { return EScan::Feed; } if (!WriteBuf.IsEmpty()) { diff --git a/ydb/core/tx/datashard/sample_k.cpp b/ydb/core/tx/datashard/sample_k.cpp index 5d04c9c6d69..e8472a4d7ec 100644 --- a/ydb/core/tx/datashard/sample_k.cpp +++ b/ydb/core/tx/datashard/sample_k.cpp @@ -156,7 +156,7 @@ public: } else { Response->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED); } - LOG_N("Finish" << Debug() << " " << Response->Record.ShortDebugString()); + LOG_N("Finish " << Debug() << " " << Response->Record.ShortDebugString()); Send(ResponseActorId, Response.Release()); Driver = nullptr; PassAway(); @@ -172,8 +172,8 @@ public: } TString Debug() const { - return TStringBuilder() << " TSampleKScan Id: " << BuildId - << " K: " << K << " Clusters: " << MaxRows.size() << " "; + return TStringBuilder() << "TSampleKScan Id: " << BuildId + << " K: " << K << " Clusters: " << MaxRows.size(); } private: diff --git a/ydb/core/tx/datashard/scan_common.h b/ydb/core/tx/datashard/scan_common.h index 0ed5942110a..e29bcf4f8b8 100644 --- a/ydb/core/tx/datashard/scan_common.h +++ b/ydb/core/tx/datashard/scan_common.h @@ -1,5 +1,8 @@ #pragma once +#include "buffer_data.h" + +#include <ydb/core/protos/index_builder.pb.h> #include <ydb/public/api/protos/ydb_value.pb.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/util/intrusive_heap.h> @@ -10,6 +13,7 @@ namespace NKikimr::NDataShard { +using TIndexBuildScanSettings = NKikimrIndexBuilder::TIndexBuildScanSettings; class TDataShard; struct TUserTable; @@ -87,4 +91,14 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo); // if IScan will provide for us "how much data did we read"? ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row); +inline TDuration GetRetryWakeupTimeoutBackoff(ui32 attempt) { + const ui32 maxBackoffExponent = 3; + + return TDuration::Seconds(1u << Min(attempt, maxBackoffExponent)); +} + +inline bool HasReachedLimits(const TBufferData& buffer, const TIndexBuildScanSettings& scanSettings) { + return buffer.HasReachedLimits(scanSettings.GetMaxBatchRows(), scanSettings.GetMaxBatchBytes()); +} + } diff --git a/ydb/core/tx/datashard/upload_stats.h b/ydb/core/tx/datashard/upload_stats.h index 8011b24d6e5..9899fce85c5 100644 --- a/ydb/core/tx/datashard/upload_stats.h +++ b/ydb/core/tx/datashard/upload_stats.h @@ -71,18 +71,4 @@ struct TUploadStatus { } }; -struct TUploadRetryLimits { - ui32 MaxUploadRowsRetryCount = 50; - ui32 BackoffCeiling = 3; - - TDuration GetTimeoutBackoff(ui32 retryNo) const { - return TDuration::Seconds(1u << Max(retryNo, BackoffCeiling)); - } -}; - -struct TUploadLimits: TUploadRetryLimits { - ui64 BatchRowsLimit = 50000; - ui64 BatchBytesLimit = 8388608; // 8MB -}; - } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp index 9071b11c5b7..06d6318acbd 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index.cpp @@ -63,10 +63,10 @@ void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuil NIceDb::TUpdate<Schema::IndexBuild::TableLocalId>(info.TablePathId.LocalPathId), NIceDb::TUpdate<Schema::IndexBuild::IndexName>(info.IndexName), NIceDb::TUpdate<Schema::IndexBuild::IndexType>(info.IndexType), - NIceDb::TUpdate<Schema::IndexBuild::MaxBatchRows>(info.Limits.MaxBatchRows), - NIceDb::TUpdate<Schema::IndexBuild::MaxBatchBytes>(info.Limits.MaxBatchBytes), - NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(info.Limits.MaxShards), - NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(info.Limits.MaxRetries), + NIceDb::TUpdate<Schema::IndexBuild::MaxBatchRows>(info.ScanSettings.GetMaxBatchRows()), + NIceDb::TUpdate<Schema::IndexBuild::MaxBatchBytes>(info.ScanSettings.GetMaxBatchBytes()), + NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(info.MaxInProgressShards), + NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(info.ScanSettings.GetMaxBatchRetries()), NIceDb::TUpdate<Schema::IndexBuild::BuildKind>(ui32(info.BuildKind)) ); // Persist details of the index build operation: ImplTableDescriptions and SpecializedIndexDescription. diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index 73f4d9fd021..8790950ff4e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -177,10 +177,8 @@ public: return makeReply("missing index or column to build"); } - buildInfo->Limits.MaxBatchRows = settings.max_batch_rows(); - buildInfo->Limits.MaxBatchBytes = settings.max_batch_bytes(); - buildInfo->Limits.MaxShards = settings.max_shards_in_flight(); - buildInfo->Limits.MaxRetries = settings.max_retries_upload_batch(); + buildInfo->ScanSettings.CopyFrom(settings.GetScanSettings()); + buildInfo->MaxInProgressShards = settings.max_shards_in_flight(); buildInfo->CreateSender = Request->Sender; buildInfo->SenderCookie = Request->Cookie; diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 918d9f3c30c..a733accc4a9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -8,6 +8,7 @@ #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/datashard/range_ops.h> #include <ydb/core/tx/datashard/upload_stats.h> +#include <ydb/core/tx/datashard/scan_common.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/tx_proxy/upload_rows.h> @@ -85,7 +86,7 @@ protected: TString LogPrefix; TString TargetTable; - NDataShard::TUploadRetryLimits Limits; + const NKikimrIndexBuilder::TIndexBuildScanSettings ScanSettings; TActorId ResponseActorId; ui64 BuildIndexId = 0; @@ -104,13 +105,14 @@ protected: public: TUploadSampleK(TString targetTable, - const TIndexBuildInfo::TLimits& limits, + const NKikimrIndexBuilder::TIndexBuildScanSettings& scanSettings, const TActorId& responseActorId, ui64 buildIndexId, TIndexBuildInfo::TSample::TRows init, NTableIndex::TClusterId parent, NTableIndex::TClusterId child) : TargetTable(std::move(targetTable)) + , ScanSettings(scanSettings) , ResponseActorId(responseActorId) , BuildIndexId(buildIndexId) , Init(std::move(init)) @@ -120,7 +122,6 @@ public: LogPrefix = TStringBuilder() << "TUploadSampleK: BuildIndexId: " << BuildIndexId << " ResponseActorId: " << ResponseActorId; - Limits.MaxUploadRowsRetryCount = limits.MaxRetries; Y_ASSERT(!Init.empty()); Y_ASSERT(Parent < Child); Y_ASSERT(Child != 0); @@ -206,10 +207,10 @@ private: UploadStatus.StatusCode = ev->Get()->Status; UploadStatus.Issues = std::move(ev->Get()->Issues); - if (UploadStatus.IsRetriable() && RetryCount < Limits.MaxUploadRowsRetryCount) { + if (UploadStatus.IsRetriable() && RetryCount < ScanSettings.GetMaxBatchRetries()) { LOG_N("Got retriable error, " << Debug() << " RetryCount: " << RetryCount); - this->Schedule(Limits.GetTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); + this->Schedule(NDataShard::GetRetryWakeupTimeoutBackoff(RetryCount), new TEvents::TEvWakeup()); return; } TAutoPtr<TEvIndexBuilder::TEvUploadSampleKResponse> response = new TEvIndexBuilder::TEvUploadSampleKResponse; @@ -742,9 +743,7 @@ private: ev->Record.SetTargetName(buildInfo.TargetName); - ev->Record.SetMaxBatchRows(buildInfo.Limits.MaxBatchRows); - ev->Record.SetMaxBatchBytes(buildInfo.Limits.MaxBatchBytes); - ev->Record.SetMaxRetries(buildInfo.Limits.MaxRetries); + ev->Record.MutableScanSettings()->CopyFrom(buildInfo.ScanSettings); auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo); @@ -760,7 +759,7 @@ private: .Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable); Y_ASSERT(buildInfo.Sample.Rows.size() <= buildInfo.KMeans.K); auto actor = new TUploadSampleK(path.PathString(), - buildInfo.Limits, Self->SelfId(), ui64(BuildId), + buildInfo.ScanSettings, Self->SelfId(), ui64(BuildId), buildInfo.Sample.Rows, buildInfo.KMeans.Parent, buildInfo.KMeans.Child); TActivationContext::AsActorContext().MakeFor(Self->SelfId()).Register(actor); @@ -777,7 +776,7 @@ private: template<typename Send> bool SendToShards(TIndexBuildInfo& buildInfo, Send&& send) { - while (!buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.size() < buildInfo.Limits.MaxShards) { + while (!buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.size() < buildInfo.MaxInProgressShards) { auto shardIdx = buildInfo.ToUploadShards.front(); buildInfo.ToUploadShards.pop_front(); buildInfo.InProgressShards.emplace(shardIdx); diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index dec6877e3ff..e07d231c38e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -296,10 +296,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild } } - settings.set_max_batch_bytes(info.Limits.MaxBatchBytes); - settings.set_max_batch_rows(info.Limits.MaxBatchRows); - settings.set_max_shards_in_flight(info.Limits.MaxShards); - settings.set_max_retries_upload_batch(info.Limits.MaxRetries); + settings.MutableScanSettings()->CopyFrom(info.ScanSettings); + settings.set_max_shards_in_flight(info.MaxInProgressShards); } void TSchemeShard::TIndexBuilder::TTxBase::AddIssue(::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>* issues, diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 7e0879ea53d..56ca4f05aad 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3026,14 +3026,6 @@ private: struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { using TPtr = TIntrusivePtr<TIndexBuildInfo>; - struct TLimits { - ui32 MaxBatchRows = 100; - ui32 MaxBatchBytes = 1 << 20; - ui32 MaxShards = 100; - ui32 MaxRetries = 50; - }; - TLimits Limits; - enum class EState: ui32 { Invalid = 0, AlterMainTable = 5, @@ -3112,6 +3104,8 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { TVector<TString> FillIndexColumns; TVector<TString> FillDataColumns; + NKikimrIndexBuilder::TIndexBuildScanSettings ScanSettings; + TVector<TColumnBuildInfo> BuildColumns; TString TargetName; @@ -3384,13 +3378,12 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { return result; } }; + TMap<TShardIdx, TShardStatus> Shards; - TDeque<TShardIdx> ToUploadShards; - THashSet<TShardIdx> InProgressShards; - std::vector<TShardIdx> DoneShards; + ui32 MaxInProgressShards = 32; TBillingStats Processed; TBillingStats Billed; @@ -3609,15 +3602,15 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> { row.template GetValueOrDefault<Schema::IndexBuild::InitiateTxDone>( indexInfo->InitiateTxDone); - indexInfo->Limits.MaxBatchRows = - row.template GetValue<Schema::IndexBuild::MaxBatchRows>(); - indexInfo->Limits.MaxBatchBytes = - row.template GetValue<Schema::IndexBuild::MaxBatchBytes>(); - indexInfo->Limits.MaxShards = + indexInfo->ScanSettings.SetMaxBatchRows( + row.template GetValue<Schema::IndexBuild::MaxBatchRows>()); + indexInfo->ScanSettings.SetMaxBatchBytes( + row.template GetValue<Schema::IndexBuild::MaxBatchBytes>()); + indexInfo->MaxInProgressShards = row.template GetValue<Schema::IndexBuild::MaxShards>(); - indexInfo->Limits.MaxRetries = + indexInfo->ScanSettings.SetMaxBatchRetries( row.template GetValueOrDefault<Schema::IndexBuild::MaxRetries>( - indexInfo->Limits.MaxRetries); + indexInfo->ScanSettings.GetMaxBatchRetries())); indexInfo->ApplyTxId = row.template GetValueOrDefault<Schema::IndexBuild::ApplyTxId>( diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 9c32ecf1b32..e1a7ce6deea 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1741,7 +1741,7 @@ namespace NSchemeShardUT_Private { TEvIndexBuilder::TEvCreateRequest* CreateBuildIndexRequest(ui64 id, const TString& dbName, const TString& src, const TBuildIndexConfig& cfg) { NKikimrIndexBuilder::TIndexBuildSettings settings; settings.set_source_path(src); - settings.set_max_batch_rows(2); + settings.MutableScanSettings()->SetMaxBatchRows(1); settings.set_max_shards_in_flight(2); Ydb::Table::TableIndex& index = *settings.mutable_index(); @@ -1795,7 +1795,7 @@ namespace NSchemeShardUT_Private { std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal) { NKikimrIndexBuilder::TIndexBuildSettings settings; settings.set_source_path(src); - settings.set_max_batch_rows(2); + settings.MutableScanSettings()->SetMaxBatchRows(1); settings.set_max_shards_in_flight(2); auto* col = settings.mutable_column_build_operation()->add_column(); diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp index 34cf342f84e..2f8482a9fd1 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp @@ -497,10 +497,10 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) { { NKikimrIndexBuilder::TIndexBuildSettings settings; settings.set_source_path("/MyRoot/Table"); - settings.set_max_batch_rows(1); - settings.set_max_batch_bytes(1<<10); + settings.MutableScanSettings()->SetMaxBatchRows(0); // row by row + settings.MutableScanSettings()->SetMaxBatchBytes(1<<10); + settings.MutableScanSettings()->SetMaxBatchRetries(0); settings.set_max_shards_in_flight(1); - settings.set_max_retries_upload_batch(0); Ydb::Table::TableIndex& index = *settings.mutable_index(); index.set_name("index1"); |