aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-12-15 13:48:18 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-12-15 13:48:18 +0300
commita3a067dc098f870195f22bac6071ab85d6bcb041 (patch)
tree98f32b82e2ad509a97eb0c0fd7305c8ada5f5907
parent64d895f4fc5fadce93faf6e40d90132dd465a669 (diff)
downloadydb-a3a067dc098f870195f22bac6071ab85d6bcb041.tar.gz
additional logging and retries for import methods through ydb-cli
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp8
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp18
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h1
5 files changed, 25 insertions, 11 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 42747d12d8..e75cbbeb6f 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -162,7 +162,7 @@ void TCommandImportFileBase::Config(TConfig& config) {
"Use portions of this size in bytes to parse and upload file data")
.DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest);
config.Opts->AddLongOption("max-in-flight",
- "Maximum number of in-flight requests; increase to load big files faster (more memory needed)")
+ "Maximum number of in-flight requests; increase to load big files faster (more memory needed)")
.DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests);
}
@@ -220,7 +220,7 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.Delimiter(Delimiter);
}
- TImportFileClient client(CreateDriver(config));
+ TImportFileClient client(CreateDriver(config), config);
ThrowOnError(client.Import(FilePath, Path, settings));
return EXIT_SUCCESS;
@@ -250,7 +250,7 @@ int TCommandImportFromJson::Run(TConfig& config) {
settings.MaxInFlightRequests(MaxInFlightRequests);
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
- TImportFileClient client(CreateDriver(config));
+ TImportFileClient client(CreateDriver(config), config);
ThrowOnError(client.Import(FilePath, Path, settings));
return EXIT_SUCCESS;
@@ -267,7 +267,7 @@ int TCommandImportFromParquet::Run(TConfig& config) {
settings.MaxInFlightRequests(MaxInFlightRequests);
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
- TImportFileClient client(CreateDriver(config));
+ TImportFileClient client(CreateDriver(config), config);
ThrowOnError(client.Import(FilePath, Path, settings));
return EXIT_SUCCESS;
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 6b61c6bcfd..dce6e5ae5b 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -44,7 +44,7 @@ TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
}
-TImportFileClient::TImportFileClient(const TDriver& driver)
+TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
: OperationClient(std::make_shared<NOperation::TOperationClient>(driver))
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
, TableClient(std::make_shared<NTable::TTableClient>(driver))
@@ -53,11 +53,11 @@ TImportFileClient::TImportFileClient(const TDriver& driver)
.OperationTimeout(TDuration::Seconds(30))
.ClientTimeout(TDuration::Seconds(35));
RetrySettings
- .MaxRetries(10);
+ .MaxRetries(100000).Verbose(rootConfig.IsVerbose());
}
TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
- if (! filePath.empty()) {
+ if (!filePath.empty()) {
const TFsPath dataFile(filePath);
if (!dataFile.Exists()) {
return MakeStatus(EStatus::BAD_REQUEST,
@@ -176,10 +176,19 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
// * read serveral lines a time
// * support endlines inside quotes
// ReadLine() should count quotes for it and stop the line then counter is odd.
+ ui32 idx = 0;
+ ui64 readSize = 0;
+ const ui32 mb100 = 1 << 27;
+ ui64 nextBorder = mb100;
while (size_t sz = input.ReadLine(line)) {
buffer += line;
buffer += '\n'; // TODO: keep original endline?
-
+ readSize += sz;
+ ++idx;
+ if (readSize >= nextBorder && RetrySettings.Verbose_) {
+ nextBorder += mb100;
+ Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl;
+ }
if (buffer.Size() >= settings.BytesPerRequest_) {
auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
if (!status.IsSuccess()) {
@@ -187,7 +196,6 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
}
inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
-
buffer = headerRow;
}
}
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index d46ec954b6..3eba2c9751 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/public/lib/ydb_cli/common/command.h>
#include <ydb/public/lib/ydb_cli/common/formats.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h>
@@ -46,7 +47,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
class TImportFileClient {
public:
- explicit TImportFileClient(const TDriver& driver);
+ explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig);
TImportFileClient(const TImportFileClient&) = delete;
// Ingest data from the input file to the database table.
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index ca42ef69ee..b12d65bed3 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -3232,7 +3232,11 @@ protected:
if (self->RetryNumber >= self->Settings.MaxRetries_) {
return self->Promise.SetValue(status);
}
-
+ if (self->Settings.Verbose_) {
+ Cerr << "Previous query attempt was finished with unsuccessful status: "
+ << status.GetIssues().ToString() << ". Status is " << status.GetStatus() << "."
+ << "Send retry attempt " << self->RetryNumber << " of " << self->Settings.MaxRetries_ << Endl;
+ }
self->RetryNumber++;
self->TableClient.Impl_->RetryOperationStatCollector.IncAsyncRetryOperation(status.GetStatus());
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index fd0908e13a..c4e1295f6c 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -859,6 +859,7 @@ struct TRetryOperationSettings {
FLUENT_SETTING_DEFAULT(TBackoffSettings, FastBackoffSettings, DefaultFastBackoffSettings());
FLUENT_SETTING_DEFAULT(TBackoffSettings, SlowBackoffSettings, DefaultSlowBackoffSettings());
FLUENT_SETTING_FLAG(Idempotent);
+ FLUENT_SETTING_FLAG(Verbose);
static TBackoffSettings DefaultFastBackoffSettings() {
return TBackoffSettings()