aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-02-11 12:21:16 +0300
committerGitHub <noreply@github.com>2025-02-11 12:21:16 +0300
commitb50302c0a6ab223b71915da080138e15bc220516 (patch)
tree40f77520b2ce4a647c3f59b2297325d7eb75592e
parenta5fc6d865de01f57efb0f42655c5e8032c3b5e5e (diff)
downloadydb-b50302c0a6ab223b71915da080138e15bc220516.tar.gz
`tools restore` optimal defaults (#14398)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_tools.cpp83
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_tools.h2
-rw-r--r--ydb/public/lib/ydb_cli/dump/dump.h8
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_compat.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_impl.cpp20
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_impl.h4
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_import_data.cpp15
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_import_data.h1
8 files changed, 91 insertions, 44 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp b/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp
index d079cfbccb..e87a1f4b14 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_tools.cpp
@@ -14,6 +14,7 @@
#include <util/generic/serialized_enum.h>
#include <util/stream/format.h>
#include <util/string/split.h>
+#include <util/system/info.h>
namespace NYdb::NConsoleClient {
@@ -142,18 +143,16 @@ void TCommandRestore::Config(TConfig& config) {
NDump::TRestoreSettings defaults;
- config.Opts->AddLongOption("restore-data", "Whether to restore data or not")
+ config.Opts->AddLongOption("restore-data", "Whether to restore data or not.")
.DefaultValue(defaults.RestoreData_).StoreResult(&RestoreData);
- config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not")
+ config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not.")
.DefaultValue(defaults.RestoreIndexes_).StoreResult(&RestoreIndexes);
- config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not")
+ config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not.")
.DefaultValue(defaults.RestoreACL_).StoreResult(&RestoreACL);
- config.Opts->AddLongOption("skip-document-tables", TStringBuilder()
- << "Document API tables cannot be restored for now. "
- << "Specify this option to skip such tables")
+ config.Opts->AddLongOption("skip-document-tables", "Skip Document API tables.")
.DefaultValue(defaults.SkipDocumentTables_).StoreResult(&SkipDocumentTables)
.Hidden(); // Deprecated
@@ -162,37 +161,60 @@ void TCommandRestore::Config(TConfig& config) {
" will be reverted in case of error.")
.StoreTrue(&SavePartialResult);
- config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB)")
- .DefaultValue("0").StoreResult(&UploadBandwidth);
-
- config.Opts->AddLongOption("rps", "Limit requests per second (example: 100)")
- .DefaultValue(defaults.RateLimiterSettings_.GetRps()).StoreResult(&UploadRps);
+ config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB).")
+ .DefaultValue("no limit")
+ .Handler1T<TString>([this](const TString& arg) {
+ UploadBandwidth = (arg == "no limit") ? "0" : arg;
+ })
+ .Hidden();
+
+ config.Opts->AddLongOption("rps", "Limit requests per second (example: 100).")
+ .DefaultValue("no limit")
+ .Handler1T<TString>([this](const TString& arg) {
+ UploadRps = (arg == "no limit") ? "0" : arg;
+ });
- config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)")
- .DefaultValue(defaults.RowsPerRequest_).StoreResult(&RowsPerRequest);
+ config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)."
+ " Not applicable in ImportData mode.")
+ .DefaultValue("no limit")
+ .Handler1T<TString>([this](const TString& arg) {
+ RowsPerRequest = (arg == "no limit") ? "0" : arg;
+ });
- config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB)")
- .DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest);
+ config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)."
+ " Not applicable in ImportData mode.")
+ .DefaultValue("no limit")
+ .Handler1T<TString>([this](const TString& arg) {
+ RequestUnitsPerRequest = (arg == "no limit") ? "0" : arg;
+ });
- config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)")
- .DefaultValue(defaults.RequestUnitsPerRequest_).StoreResult(&RequestUnitsPerRequest);
+ config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB).")
+ .DefaultValue("auto")
+ .Handler1T<TString>([this](const TString& arg) {
+ BytesPerRequest = (arg == "auto") ? "0" : arg;
+ });
- config.Opts->AddLongOption("in-flight", "Limit in-flight request count")
- .DefaultValue(defaults.InFly_).StoreResult(&InFly);
+ config.Opts->AddLongOption("in-flight", "Limit in-flight request count.")
+ .DefaultValue("auto")
+ .Handler1T<TString>([this](const TString& arg) {
+ InFlight = (arg == "auto") ? 0 : FromString<ui32>(arg);
+ });
config.Opts->AddLongOption("bulk-upsert", "Use BulkUpsert - a more efficient way to upload data with lower consistency level."
- " Global secondary indexes are not supported in this mode.")
+ " Global secondary indexes are not supported in this mode.")
.StoreTrue(&UseBulkUpsert)
.Hidden(); // Deprecated. Using ImportData should be more effective.
config.Opts->AddLongOption("import-data", "Use ImportData - a more efficient way to upload data."
- " ImportData will throw an error if you try to upload data into an existing table that has"
- " secondary indexes or is in the process of building them. If you need to restore a table"
- " with secondary indexes, make sure it's not already present in the scheme.")
+ " ImportData will throw an error if you try to upload data into an existing table that has"
+ " secondary indexes or is in the process of building them. If you need to restore a table"
+ " with secondary indexes, make sure it's not already present in the scheme.")
.StoreTrue(&UseImportData);
config.Opts->MutuallyExclusive("bandwidth", "rps");
config.Opts->MutuallyExclusive("import-data", "bulk-upsert");
+ config.Opts->MutuallyExclusive("import-data", "upload-batch-rows");
+ config.Opts->MutuallyExclusive("import-data", "upload-batch-rus");
}
void TCommandRestore::ExtractParams(TConfig& config) {
@@ -208,17 +230,24 @@ int TCommandRestore::Run(TConfig& config) {
.RestoreACL(RestoreACL)
.SkipDocumentTables(SkipDocumentTables)
.SavePartialResult(SavePartialResult)
- .RowsPerRequest(NYdb::SizeFromString(RowsPerRequest))
- .InFly(InFly);
+ .RowsPerRequest(NYdb::SizeFromString(RowsPerRequest));
+
+ if (InFlight) {
+ settings.MaxInFlight(InFlight);
+ } else if (!UseImportData) {
+ settings.MaxInFlight(NSystemInfo::CachedNumberOfCpus());
+ }
if (auto bytesPerRequest = NYdb::SizeFromString(BytesPerRequest)) {
- if (bytesPerRequest > NDump::TRestoreSettings::MaxBytesPerRequest) {
+ if (UseImportData && bytesPerRequest > NDump::TRestoreSettings::MaxImportDataBytesPerRequest) {
throw TMisuseException()
<< "--upload-batch-bytes cannot be larger than "
- << HumanReadableSize(NDump::TRestoreSettings::MaxBytesPerRequest, SF_BYTES);
+ << HumanReadableSize(NDump::TRestoreSettings::MaxImportDataBytesPerRequest, SF_BYTES);
}
settings.BytesPerRequest(bytesPerRequest);
+ } else if (UseImportData) {
+ settings.BytesPerRequest(NDump::TRestoreSettings::MaxImportDataBytesPerRequest);
}
if (RequestUnitsPerRequest) {
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_tools.h b/ydb/public/lib/ydb_cli/commands/ydb_tools.h
index 6c62fd30d2..d0f28c7784 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_tools.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_tools.h
@@ -66,7 +66,7 @@ private:
TString RowsPerRequest;
TString BytesPerRequest;
TString RequestUnitsPerRequest;
- ui32 InFly;
+ ui32 InFlight;
bool UseBulkUpsert = false;
bool UseImportData = false;
};
diff --git a/ydb/public/lib/ydb_cli/dump/dump.h b/ydb/public/lib/ydb_cli/dump/dump.h
index 1c5374c3f0..76aeacd3c0 100644
--- a/ydb/public/lib/ydb_cli/dump/dump.h
+++ b/ydb/public/lib/ydb_cli/dump/dump.h
@@ -64,7 +64,7 @@ public:
struct TRateLimiterSettings {
using TSelf = TRateLimiterSettings;
- FLUENT_SETTING_DEFAULT(ui32, Rate, 30);
+ FLUENT_SETTING_DEFAULT(ui32, Rate, Max<ui32>());
FLUENT_SETTING_DEFAULT(TDuration, Interval, TDuration::Seconds(1));
FLUENT_SETTING_DEFAULT(TDuration, ReactionTime, TDuration::MilliSeconds(50));
@@ -91,7 +91,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
ImportData,
};
- static constexpr ui64 MaxBytesPerRequest = 16_MB;
+ static constexpr ui64 MaxImportDataBytesPerRequest = 16_MB;
FLUENT_SETTING_DEFAULT(EMode, Mode, EMode::Yql);
FLUENT_SETTING_DEFAULT(bool, DryRun, false);
@@ -105,9 +105,9 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
FLUENT_SETTING_DEFAULT(ui64, MemLimit, 32_MB);
FLUENT_SETTING_DEFAULT(ui64, RowsPerRequest, 0);
FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 512_KB);
- FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 30);
+ FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 0);
FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB);
- FLUENT_SETTING_DEFAULT(ui32, InFly, 10);
+ FLUENT_SETTING_DEFAULT(ui32, MaxInFlight, 0);
FLUENT_SETTING_DEFAULT(TRateLimiterSettings, RateLimiterSettings, {});
}; // TRestoreSettings
diff --git a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
index f53295792b..d916e0de62 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
@@ -128,7 +128,7 @@ public:
Y_ENSURE(dataAccumulator);
TUploader::TOptions opts;
- opts.InFly = settings.InFly_;
+ opts.InFly = settings.MaxInFlight_;
opts.Rate = settings.RateLimiterSettings_.Rate_;
opts.Interval = settings.RateLimiterSettings_.Interval_;
opts.ReactionTime = settings.RateLimiterSettings_.ReactionTime_;
diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
index 29d3c3d833..a9ac724677 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
@@ -22,6 +22,7 @@
#include <util/generic/vector.h>
#include <util/stream/file.h>
#include <util/string/join.h>
+#include <util/system/info.h>
#include <google/protobuf/text_format.h>
@@ -711,7 +712,8 @@ TRestoreResult TRestoreClient::RestoreTable(
}
if (settings.RestoreData_) {
- auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc);
+ const ui32 partitionCount = scheme.partition_at_keys().split_points().size() + 1;
+ auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc, partitionCount);
if (!result.IsSuccess()) {
return result;
}
@@ -797,6 +799,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter(
const TString& dbPath,
const TRestoreSettings& settings,
const TTableDescription& desc,
+ ui32 partitionCount,
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators)
{
THolder<NPrivate::IDataWriter> writer;
@@ -809,7 +812,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter(
}
case TRestoreSettings::EMode::ImportData: {
- writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulators, settings, Log));
+ writer.Reset(CreateImportDataWriter(dbPath, desc, partitionCount, ImportClient, TableClient, accumulators, settings, Log));
break;
}
}
@@ -824,7 +827,11 @@ TRestoreResult TRestoreClient::CreateDataAccumulators(
const TTableDescription& desc,
ui32 dataFilesCount)
{
- const ui32 accumulatorsCount = std::min(settings.InFly_, dataFilesCount);
+ size_t accumulatorsCount = settings.MaxInFlight_;
+ if (!accumulatorsCount) {
+ accumulatorsCount = Min<size_t>(dataFilesCount, NSystemInfo::CachedNumberOfCpus());
+ }
+
outAccumulators.resize(accumulatorsCount);
switch (settings.Mode_) {
@@ -855,7 +862,8 @@ TRestoreResult TRestoreClient::RestoreData(
const TFsPath& fsPath,
const TString& dbPath,
const TRestoreSettings& settings,
- const TTableDescription& desc)
+ const TTableDescription& desc,
+ ui32 partitionCount)
{
// Threads can access memory owned by this vector through pointers during restore operation
TVector<TFsPath> dataFiles = CollectDataFiles(fsPath);
@@ -870,7 +878,7 @@ TRestoreResult TRestoreClient::RestoreData(
return res;
}
- THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, accumulators);
+ THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators);
TVector<TFuture<TRestoreResult>> accumulatorResults(Reserve(accumulators.size()));
TThreadPool accumulatorWorkers(TThreadPool::TParams().SetBlocking(true));
@@ -963,7 +971,7 @@ TRestoreResult TRestoreClient::RestoreData(
}
if (dataFound) {
- writer = CreateDataWriter(dbPath, settings, desc, accumulators);
+ writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators);
for (auto& acc : accumulators) {
while (acc->Ready(true)) {
if (!writer->Push(acc->GetData(true))) {
diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.h b/ydb/public/lib/ydb_cli/dump/restore_impl.h
index 51761e3016..79869e03ab 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_impl.h
+++ b/ydb/public/lib/ydb_cli/dump/restore_impl.h
@@ -136,14 +136,14 @@ class TRestoreClient {
TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath);
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
- TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
+ TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 partitionCount);
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreChangefeeds(const TFsPath& path, const TString& dbPath);
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreConsumers(const TString& topicPath, const std::vector<NTopic::TConsumer>& consumers);
THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,
- const NTable::TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
+ const NTable::TTableDescription& desc, ui32 partitionCount, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
TRestoreResult CreateDataAccumulators(TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators,
const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc,
ui32 dataFilesCount);
diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
index da42487559..b1e2b13042 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
@@ -18,6 +18,7 @@
#include <util/stream/str.h>
#include <util/string/builder.h>
#include <util/string/cast.h>
+#include <util/system/info.h>
#include <util/system/mutex.h>
#include <util/thread/pool.h>
@@ -903,6 +904,7 @@ public:
explicit TDataWriter(
const TString& path,
const TTableDescription& desc,
+ ui32 partitionCount,
const TRestoreSettings& settings,
TImportClient& importClient,
TTableClient& tableClient,
@@ -925,11 +927,17 @@ public:
}
TasksQueue = MakeHolder<TThreadPool>(TThreadPool::TParams().SetBlocking(true).SetCatching(true));
- TasksQueue->Start(settings.InFly_, settings.InFly_ + 1);
+
+ size_t threadCount = settings.MaxInFlight_;
+ if (!threadCount) {
+ threadCount = Min<size_t>(partitionCount, NSystemInfo::CachedNumberOfCpus());
+ }
+
+ TasksQueue->Start(threadCount, threadCount + 1);
}
bool Push(NPrivate::TBatch&& data) override {
- if (data.size() > TRestoreSettings::MaxBytesPerRequest) {
+ if (data.size() > TRestoreSettings::MaxImportDataBytesPerRequest) {
LOG_E("Too much data: " << data.GetLocation());
return false;
}
@@ -989,13 +997,14 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator(
NPrivate::IDataWriter* CreateImportDataWriter(
const TString& path,
const TTableDescription& desc,
+ ui32 partitionCount,
TImportClient& importClient,
TTableClient& tableClient,
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log)
{
- return new TDataWriter(path, desc, settings, importClient, tableClient, accumulators, log);
+ return new TDataWriter(path, desc, partitionCount, settings, importClient, tableClient, accumulators, log);
}
} // NDump
diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.h b/ydb/public/lib/ydb_cli/dump/restore_import_data.h
index f8d5044c7e..8d82b85ff2 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_import_data.h
+++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.h
@@ -16,6 +16,7 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator(
NPrivate::IDataWriter* CreateImportDataWriter(
const TString& path,
const NTable::TTableDescription& desc,
+ ui32 partitionCount,
NImport::TImportClient& importClient,
NTable::TTableClient& tableClient,
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,