diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2025-02-11 12:21:16 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-11 12:21:16 +0300 |
commit | b50302c0a6ab223b71915da080138e15bc220516 (patch) | |
tree | 40f77520b2ce4a647c3f59b2297325d7eb75592e | |
parent | a5fc6d865de01f57efb0f42655c5e8032c3b5e5e (diff) | |
download | ydb-b50302c0a6ab223b71915da080138e15bc220516.tar.gz |
`tools restore` optimal defaults (#14398)
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_tools.cpp | 83 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_tools.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/dump.h | 8 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_compat.cpp | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_impl.cpp | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_impl.h | 4 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_import_data.cpp | 15 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/dump/restore_import_data.h | 1 |
8 files changed, 91 insertions, 44 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp b/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp index d079cfbccb..e87a1f4b14 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp @@ -14,6 +14,7 @@ #include <util/generic/serialized_enum.h> #include <util/stream/format.h> #include <util/string/split.h> +#include <util/system/info.h> namespace NYdb::NConsoleClient { @@ -142,18 +143,16 @@ void TCommandRestore::Config(TConfig& config) { NDump::TRestoreSettings defaults; - config.Opts->AddLongOption("restore-data", "Whether to restore data or not") + config.Opts->AddLongOption("restore-data", "Whether to restore data or not.") .DefaultValue(defaults.RestoreData_).StoreResult(&RestoreData); - config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not") + config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not.") .DefaultValue(defaults.RestoreIndexes_).StoreResult(&RestoreIndexes); - config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not") + config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not.") .DefaultValue(defaults.RestoreACL_).StoreResult(&RestoreACL); - config.Opts->AddLongOption("skip-document-tables", TStringBuilder() - << "Document API tables cannot be restored for now. " - << "Specify this option to skip such tables") + config.Opts->AddLongOption("skip-document-tables", "Skip Document API tables.") .DefaultValue(defaults.SkipDocumentTables_).StoreResult(&SkipDocumentTables) .Hidden(); // Deprecated @@ -162,37 +161,60 @@ void TCommandRestore::Config(TConfig& config) { " will be reverted in case of error.") .StoreTrue(&SavePartialResult); - config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB)") - .DefaultValue("0").StoreResult(&UploadBandwidth); - - config.Opts->AddLongOption("rps", "Limit requests per second (example: 100)") - .DefaultValue(defaults.RateLimiterSettings_.GetRps()).StoreResult(&UploadRps); + config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB).") + .DefaultValue("no limit") + .Handler1T<TString>([this](const TString& arg) { + UploadBandwidth = (arg == "no limit") ? "0" : arg; + }) + .Hidden(); + + config.Opts->AddLongOption("rps", "Limit requests per second (example: 100).") + .DefaultValue("no limit") + .Handler1T<TString>([this](const TString& arg) { + UploadRps = (arg == "no limit") ? "0" : arg; + }); - config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)") - .DefaultValue(defaults.RowsPerRequest_).StoreResult(&RowsPerRequest); + config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)." + " Not applicable in ImportData mode.") + .DefaultValue("no limit") + .Handler1T<TString>([this](const TString& arg) { + RowsPerRequest = (arg == "no limit") ? "0" : arg; + }); - config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB)") - .DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest); + config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)." + " Not applicable in ImportData mode.") + .DefaultValue("no limit") + .Handler1T<TString>([this](const TString& arg) { + RequestUnitsPerRequest = (arg == "no limit") ? "0" : arg; + }); - config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)") - .DefaultValue(defaults.RequestUnitsPerRequest_).StoreResult(&RequestUnitsPerRequest); + config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB).") + .DefaultValue("auto") + .Handler1T<TString>([this](const TString& arg) { + BytesPerRequest = (arg == "auto") ? "0" : arg; + }); - config.Opts->AddLongOption("in-flight", "Limit in-flight request count") - .DefaultValue(defaults.InFly_).StoreResult(&InFly); + config.Opts->AddLongOption("in-flight", "Limit in-flight request count.") + .DefaultValue("auto") + .Handler1T<TString>([this](const TString& arg) { + InFlight = (arg == "auto") ? 0 : FromString<ui32>(arg); + }); config.Opts->AddLongOption("bulk-upsert", "Use BulkUpsert - a more efficient way to upload data with lower consistency level." - " Global secondary indexes are not supported in this mode.") + " Global secondary indexes are not supported in this mode.") .StoreTrue(&UseBulkUpsert) .Hidden(); // Deprecated. Using ImportData should be more effective. config.Opts->AddLongOption("import-data", "Use ImportData - a more efficient way to upload data." - " ImportData will throw an error if you try to upload data into an existing table that has" - " secondary indexes or is in the process of building them. If you need to restore a table" - " with secondary indexes, make sure it's not already present in the scheme.") + " ImportData will throw an error if you try to upload data into an existing table that has" + " secondary indexes or is in the process of building them. If you need to restore a table" + " with secondary indexes, make sure it's not already present in the scheme.") .StoreTrue(&UseImportData); config.Opts->MutuallyExclusive("bandwidth", "rps"); config.Opts->MutuallyExclusive("import-data", "bulk-upsert"); + config.Opts->MutuallyExclusive("import-data", "upload-batch-rows"); + config.Opts->MutuallyExclusive("import-data", "upload-batch-rus"); } void TCommandRestore::ExtractParams(TConfig& config) { @@ -208,17 +230,24 @@ int TCommandRestore::Run(TConfig& config) { .RestoreACL(RestoreACL) .SkipDocumentTables(SkipDocumentTables) .SavePartialResult(SavePartialResult) - .RowsPerRequest(NYdb::SizeFromString(RowsPerRequest)) - .InFly(InFly); + .RowsPerRequest(NYdb::SizeFromString(RowsPerRequest)); + + if (InFlight) { + settings.MaxInFlight(InFlight); + } else if (!UseImportData) { + settings.MaxInFlight(NSystemInfo::CachedNumberOfCpus()); + } if (auto bytesPerRequest = NYdb::SizeFromString(BytesPerRequest)) { - if (bytesPerRequest > NDump::TRestoreSettings::MaxBytesPerRequest) { + if (UseImportData && bytesPerRequest > NDump::TRestoreSettings::MaxImportDataBytesPerRequest) { throw TMisuseException() << "--upload-batch-bytes cannot be larger than " - << HumanReadableSize(NDump::TRestoreSettings::MaxBytesPerRequest, SF_BYTES); + << HumanReadableSize(NDump::TRestoreSettings::MaxImportDataBytesPerRequest, SF_BYTES); } settings.BytesPerRequest(bytesPerRequest); + } else if (UseImportData) { + settings.BytesPerRequest(NDump::TRestoreSettings::MaxImportDataBytesPerRequest); } if (RequestUnitsPerRequest) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_tools.h b/ydb/public/lib/ydb_cli/commands/ydb_tools.h index 6c62fd30d2..d0f28c7784 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_tools.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_tools.h @@ -66,7 +66,7 @@ private: TString RowsPerRequest; TString BytesPerRequest; TString RequestUnitsPerRequest; - ui32 InFly; + ui32 InFlight; bool UseBulkUpsert = false; bool UseImportData = false; }; diff --git a/ydb/public/lib/ydb_cli/dump/dump.h b/ydb/public/lib/ydb_cli/dump/dump.h index 1c5374c3f0..76aeacd3c0 100644 --- a/ydb/public/lib/ydb_cli/dump/dump.h +++ b/ydb/public/lib/ydb_cli/dump/dump.h @@ -64,7 +64,7 @@ public: struct TRateLimiterSettings { using TSelf = TRateLimiterSettings; - FLUENT_SETTING_DEFAULT(ui32, Rate, 30); + FLUENT_SETTING_DEFAULT(ui32, Rate, Max<ui32>()); FLUENT_SETTING_DEFAULT(TDuration, Interval, TDuration::Seconds(1)); FLUENT_SETTING_DEFAULT(TDuration, ReactionTime, TDuration::MilliSeconds(50)); @@ -91,7 +91,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> { ImportData, }; - static constexpr ui64 MaxBytesPerRequest = 16_MB; + static constexpr ui64 MaxImportDataBytesPerRequest = 16_MB; FLUENT_SETTING_DEFAULT(EMode, Mode, EMode::Yql); FLUENT_SETTING_DEFAULT(bool, DryRun, false); @@ -105,9 +105,9 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> { FLUENT_SETTING_DEFAULT(ui64, MemLimit, 32_MB); FLUENT_SETTING_DEFAULT(ui64, RowsPerRequest, 0); FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 512_KB); - FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 30); + FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 0); FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB); - FLUENT_SETTING_DEFAULT(ui32, InFly, 10); + FLUENT_SETTING_DEFAULT(ui32, MaxInFlight, 0); FLUENT_SETTING_DEFAULT(TRateLimiterSettings, RateLimiterSettings, {}); }; // TRestoreSettings diff --git a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp index f53295792b..d916e0de62 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp @@ -128,7 +128,7 @@ public: Y_ENSURE(dataAccumulator); TUploader::TOptions opts; - opts.InFly = settings.InFly_; + opts.InFly = settings.MaxInFlight_; opts.Rate = settings.RateLimiterSettings_.Rate_; opts.Interval = settings.RateLimiterSettings_.Interval_; opts.ReactionTime = settings.RateLimiterSettings_.ReactionTime_; diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index 29d3c3d833..a9ac724677 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -22,6 +22,7 @@ #include <util/generic/vector.h> #include <util/stream/file.h> #include <util/string/join.h> +#include <util/system/info.h> #include <google/protobuf/text_format.h> @@ -711,7 +712,8 @@ TRestoreResult TRestoreClient::RestoreTable( } if (settings.RestoreData_) { - auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc); + const ui32 partitionCount = scheme.partition_at_keys().split_points().size() + 1; + auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc, partitionCount); if (!result.IsSuccess()) { return result; } @@ -797,6 +799,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter( const TString& dbPath, const TRestoreSettings& settings, const TTableDescription& desc, + ui32 partitionCount, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators) { THolder<NPrivate::IDataWriter> writer; @@ -809,7 +812,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter( } case TRestoreSettings::EMode::ImportData: { - writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulators, settings, Log)); + writer.Reset(CreateImportDataWriter(dbPath, desc, partitionCount, ImportClient, TableClient, accumulators, settings, Log)); break; } } @@ -824,7 +827,11 @@ TRestoreResult TRestoreClient::CreateDataAccumulators( const TTableDescription& desc, ui32 dataFilesCount) { - const ui32 accumulatorsCount = std::min(settings.InFly_, dataFilesCount); + size_t accumulatorsCount = settings.MaxInFlight_; + if (!accumulatorsCount) { + accumulatorsCount = Min<size_t>(dataFilesCount, NSystemInfo::CachedNumberOfCpus()); + } + outAccumulators.resize(accumulatorsCount); switch (settings.Mode_) { @@ -855,7 +862,8 @@ TRestoreResult TRestoreClient::RestoreData( const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, - const TTableDescription& desc) + const TTableDescription& desc, + ui32 partitionCount) { // Threads can access memory owned by this vector through pointers during restore operation TVector<TFsPath> dataFiles = CollectDataFiles(fsPath); @@ -870,7 +878,7 @@ TRestoreResult TRestoreClient::RestoreData( return res; } - THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, accumulators); + THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators); TVector<TFuture<TRestoreResult>> accumulatorResults(Reserve(accumulators.size())); TThreadPool accumulatorWorkers(TThreadPool::TParams().SetBlocking(true)); @@ -963,7 +971,7 @@ TRestoreResult TRestoreClient::RestoreData( } if (dataFound) { - writer = CreateDataWriter(dbPath, settings, desc, accumulators); + writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators); for (auto& acc : accumulators) { while (acc->Ready(true)) { if (!writer->Push(acc->GetData(true))) { diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.h b/ydb/public/lib/ydb_cli/dump/restore_impl.h index 51761e3016..79869e03ab 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.h +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.h @@ -136,14 +136,14 @@ class TRestoreClient { TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath); TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc); - TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc); + TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 partitionCount); TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc); TRestoreResult RestoreChangefeeds(const TFsPath& path, const TString& dbPath); TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting); TRestoreResult RestoreConsumers(const TString& topicPath, const std::vector<NTopic::TConsumer>& consumers); THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings, - const NTable::TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators); + const NTable::TTableDescription& desc, ui32 partitionCount, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators); TRestoreResult CreateDataAccumulators(TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 dataFilesCount); diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp index da42487559..b1e2b13042 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp @@ -18,6 +18,7 @@ #include <util/stream/str.h> #include <util/string/builder.h> #include <util/string/cast.h> +#include <util/system/info.h> #include <util/system/mutex.h> #include <util/thread/pool.h> @@ -903,6 +904,7 @@ public: explicit TDataWriter( const TString& path, const TTableDescription& desc, + ui32 partitionCount, const TRestoreSettings& settings, TImportClient& importClient, TTableClient& tableClient, @@ -925,11 +927,17 @@ public: } TasksQueue = MakeHolder<TThreadPool>(TThreadPool::TParams().SetBlocking(true).SetCatching(true)); - TasksQueue->Start(settings.InFly_, settings.InFly_ + 1); + + size_t threadCount = settings.MaxInFlight_; + if (!threadCount) { + threadCount = Min<size_t>(partitionCount, NSystemInfo::CachedNumberOfCpus()); + } + + TasksQueue->Start(threadCount, threadCount + 1); } bool Push(NPrivate::TBatch&& data) override { - if (data.size() > TRestoreSettings::MaxBytesPerRequest) { + if (data.size() > TRestoreSettings::MaxImportDataBytesPerRequest) { LOG_E("Too much data: " << data.GetLocation()); return false; } @@ -989,13 +997,14 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator( NPrivate::IDataWriter* CreateImportDataWriter( const TString& path, const TTableDescription& desc, + ui32 partitionCount, TImportClient& importClient, TTableClient& tableClient, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators, const TRestoreSettings& settings, const std::shared_ptr<TLog>& log) { - return new TDataWriter(path, desc, settings, importClient, tableClient, accumulators, log); + return new TDataWriter(path, desc, partitionCount, settings, importClient, tableClient, accumulators, log); } } // NDump diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.h b/ydb/public/lib/ydb_cli/dump/restore_import_data.h index f8d5044c7e..8d82b85ff2 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_import_data.h +++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.h @@ -16,6 +16,7 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator( NPrivate::IDataWriter* CreateImportDataWriter( const TString& path, const NTable::TTableDescription& desc, + ui32 partitionCount, NImport::TImportClient& importClient, NTable::TTableClient& tableClient, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators, |