diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-15 13:48:18 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-15 13:48:18 +0300 |
commit | a3a067dc098f870195f22bac6071ab85d6bcb041 (patch) | |
tree | 98f32b82e2ad509a97eb0c0fd7305c8ada5f5907 | |
parent | 64d895f4fc5fadce93faf6e40d90132dd465a669 (diff) | |
download | ydb-a3a067dc098f870195f22bac6071ab85d6bcb041.tar.gz |
additional logging and retries for import methods through ydb-cli
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp | 8 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 18 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.h | 1 |
5 files changed, 25 insertions, 11 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 42747d12d8..e75cbbeb6f 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -162,7 +162,7 @@ void TCommandImportFileBase::Config(TConfig& config) { "Use portions of this size in bytes to parse and upload file data") .DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest); config.Opts->AddLongOption("max-in-flight", - "Maximum number of in-flight requests; increase to load big files faster (more memory needed)") + "Maximum number of in-flight requests; increase to load big files faster (more memory needed)") .DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests); } @@ -220,7 +220,7 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.Delimiter(Delimiter); } - TImportFileClient client(CreateDriver(config)); + TImportFileClient client(CreateDriver(config), config); ThrowOnError(client.Import(FilePath, Path, settings)); return EXIT_SUCCESS; @@ -250,7 +250,7 @@ int TCommandImportFromJson::Run(TConfig& config) { settings.MaxInFlightRequests(MaxInFlightRequests); settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); - TImportFileClient client(CreateDriver(config)); + TImportFileClient client(CreateDriver(config), config); ThrowOnError(client.Import(FilePath, Path, settings)); return EXIT_SUCCESS; @@ -267,7 +267,7 @@ int TCommandImportFromParquet::Run(TConfig& config) { settings.MaxInFlightRequests(MaxInFlightRequests); settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); - TImportFileClient client(CreateDriver(config)); + TImportFileClient client(CreateDriver(config), config); ThrowOnError(client.Import(FilePath, Path, settings)); return EXIT_SUCCESS; diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 6b61c6bcfd..dce6e5ae5b 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -44,7 +44,7 @@ TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) { } -TImportFileClient::TImportFileClient(const TDriver& driver) +TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig) : OperationClient(std::make_shared<NOperation::TOperationClient>(driver)) , SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver)) , TableClient(std::make_shared<NTable::TTableClient>(driver)) @@ -53,11 +53,11 @@ TImportFileClient::TImportFileClient(const TDriver& driver) .OperationTimeout(TDuration::Seconds(30)) .ClientTimeout(TDuration::Seconds(35)); RetrySettings - .MaxRetries(10); + .MaxRetries(100000).Verbose(rootConfig.IsVerbose()); } TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { - if (! filePath.empty()) { + if (!filePath.empty()) { const TFsPath dataFile(filePath); if (!dataFile.Exists()) { return MakeStatus(EStatus::BAD_REQUEST, @@ -176,10 +176,19 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, // * read serveral lines a time // * support endlines inside quotes // ReadLine() should count quotes for it and stop the line then counter is odd. + ui32 idx = 0; + ui64 readSize = 0; + const ui32 mb100 = 1 << 27; + ui64 nextBorder = mb100; while (size_t sz = input.ReadLine(line)) { buffer += line; buffer += '\n'; // TODO: keep original endline? - + readSize += sz; + ++idx; + if (readSize >= nextBorder && RetrySettings.Verbose_) { + nextBorder += mb100; + Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; + } if (buffer.Size() >= settings.BytesPerRequest_) { auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); if (!status.IsSuccess()) { @@ -187,7 +196,6 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, } inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); - buffer = headerRow; } } diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index d46ec954b6..3eba2c9751 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/public/lib/ydb_cli/common/command.h> #include <ydb/public/lib/ydb_cli/common/formats.h> #include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> #include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h> @@ -46,7 +47,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting class TImportFileClient { public: - explicit TImportFileClient(const TDriver& driver); + explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig); TImportFileClient(const TImportFileClient&) = delete; // Ingest data from the input file to the database table. diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index ca42ef69ee..b12d65bed3 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -3232,7 +3232,11 @@ protected: if (self->RetryNumber >= self->Settings.MaxRetries_) { return self->Promise.SetValue(status); } - + if (self->Settings.Verbose_) { + Cerr << "Previous query attempt was finished with unsuccessful status: " + << status.GetIssues().ToString() << ". Status is " << status.GetStatus() << "." + << "Send retry attempt " << self->RetryNumber << " of " << self->Settings.MaxRetries_ << Endl; + } self->RetryNumber++; self->TableClient.Impl_->RetryOperationStatCollector.IncAsyncRetryOperation(status.GetStatus()); diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index fd0908e13a..c4e1295f6c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -859,6 +859,7 @@ struct TRetryOperationSettings { FLUENT_SETTING_DEFAULT(TBackoffSettings, FastBackoffSettings, DefaultFastBackoffSettings()); FLUENT_SETTING_DEFAULT(TBackoffSettings, SlowBackoffSettings, DefaultSlowBackoffSettings()); FLUENT_SETTING_FLAG(Idempotent); + FLUENT_SETTING_FLAG(Verbose); static TBackoffSettings DefaultFastBackoffSettings() { return TBackoffSettings() |