summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaksim Kita <[email protected]>2023-05-26 15:31:36 +0000
committermaksim-kita <[email protected]>2023-05-26 18:31:36 +0300
commit6e44fe97c0f37d89e2944af60916b507b0e9a27b (patch)
treed5944f3d499eb98169270dfd7c3bc637a7d0bab0
parent0fc8c71685d1d603d420ff40bcf1b0916940c99f (diff)
Improve performance of import
Improve performance of import Pull Request resolved: #189
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp15
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.h3
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp344
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h15
4 files changed, 260 insertions, 117 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 05faf20f6c4..d9e2ac6041a 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -168,11 +168,10 @@ TCommandImportFromFile::TCommandImportFromFile()
void TCommandImportFileBase::Config(TConfig& config) {
TYdbCommand::Config(config);
- config.Opts->SetTrailingArgTitle("<input files...>",
+ config.Opts->SetTrailingArgTitle("<input files...>",
"One or more file paths to import from");
config.Opts->AddLongOption('p', "path", "Database path to table")
.Required().RequiredArgument("STRING").StoreResult(&Path);
-
config.Opts->AddLongOption('i', "input-file").AppendTo(&FilePaths).Hidden();
const TImportFileSettings defaults;
@@ -182,6 +181,9 @@ void TCommandImportFileBase::Config(TConfig& config) {
config.Opts->AddLongOption("max-in-flight",
"Maximum number of in-flight requests; increase to load big files faster (more memory needed)")
.DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests);
+ config.Opts->AddLongOption("threads",
+ "Maximum number of threads; number of available processors if not specified")
+ .DefaultValue(defaults.Threads_).StoreResult(&Threads);
}
void TCommandImportFileBase::Parse(TConfig& config) {
@@ -226,7 +228,7 @@ void TCommandImportFromCsv::Config(TConfig& config) {
config.Opts->AddLongOption("header",
"Set if file contains column names at the first not skipped row")
.StoreTrue(&Header);
- config.Opts->AddLongOption("columns",
+ config.Opts->AddLongOption("columns",
"String with column names that replaces header")
.RequiredArgument("STR").StoreResult(&HeaderRow);
config.Opts->AddLongOption("newline-delimited",
@@ -246,6 +248,7 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.Format(InputFormat);
settings.MaxInFlightRequests(MaxInFlightRequests);
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+ settings.Threads(Threads);
settings.SkipRows(SkipRows);
settings.Header(Header);
settings.NewlineDelimited(NewlineDelimited);
@@ -288,6 +291,7 @@ int TCommandImportFromJson::Run(TConfig& config) {
settings.Format(InputFormat);
settings.MaxInFlightRequests(MaxInFlightRequests);
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+ settings.Threads(Threads);
TImportFileClient client(CreateDriver(config), config);
ThrowOnError(client.Import(FilePaths, Path, settings));
@@ -295,16 +299,17 @@ int TCommandImportFromJson::Run(TConfig& config) {
return EXIT_SUCCESS;
}
-/// Import Parquet
+/// Import Parquet
void TCommandImportFromParquet::Config(TConfig& config) {
TCommandImportFileBase::Config(config);
}
int TCommandImportFromParquet::Run(TConfig& config) {
- TImportFileSettings settings;
+ TImportFileSettings settings;
settings.Format(InputFormat);
settings.MaxInFlightRequests(MaxInFlightRequests);
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+ settings.Threads(Threads);
TImportFileClient client(CreateDriver(config), config);
ThrowOnError(client.Import(FilePaths, Path, settings));
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
index 450688a9f8d..501a851eaf1 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
@@ -61,6 +61,7 @@ protected:
TVector<TString> FilePaths;
TString BytesPerRequest;
ui64 MaxInFlightRequests = 1;
+ ui64 Threads = 0;
};
class TCommandImportFromCsv : public TCommandImportFileBase {
@@ -108,7 +109,7 @@ public:
class TCommandImportFromParquet : public TCommandImportFileBase {
public:
TCommandImportFromParquet(const TString& cmd = "parquet", const TString& cmdDescription = "Import data from Parquet file")
- : TCommandImportFileBase(cmd, cmdDescription)
+ : TCommandImportFileBase(cmd, cmdDescription)
{
InputFormat = EOutputFormat::Parquet;
}
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index b337930a88f..44e9c8d572a 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -21,6 +21,7 @@
#include <util/stream/file.h>
#include <util/stream/length.h>
#include <util/string/builder.h>
+#include <util/system/thread.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
@@ -34,6 +35,7 @@
#if defined(_win32_)
#include <windows.h>
+#include <io.h>
#elif defined(_unix_)
#include <unistd.h>
#endif
@@ -82,9 +84,18 @@ FHANDLE GetStdinFileno() {
#endif
}
+bool IsStdoutInteractive() {
+#if defined(_win32_)
+ return _isatty(_fileno(stdout));
+#elif defined(_unix_)
+ return isatty(fileno(stdout));
+#endif
+ return true;
+}
+
class TMaxInflightGetter {
public:
- TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount)
+ TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount)
: TotalMaxInFlight(totalMaxInFlight)
, FilesCount(filesCount) {
}
@@ -134,7 +145,7 @@ private:
};
public:
- TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow,
+ TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow,
TMaxInflightGetter& inFlightGetter) {
TFile file;
if (filePath) {
@@ -232,8 +243,18 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand
TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) {
FilesCount = filePaths.size();
- auto pool = CreateThreadPool(FilesCount);
- TVector<NThreading::TFuture<TStatus>> asyncResults;
+ if (settings.Format_ == EOutputFormat::Tsv && settings.Delimiter_ != "\t") {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
+ }
+
+ auto result = NDump::DescribePath(*SchemeClient, dbPath);
+ auto resultStatus = result.GetStatus();
+ if (resultStatus != EStatus::SUCCESS) {
+ return MakeStatus(EStatus::SCHEME_ERROR,
+ TStringBuilder() << result.GetIssues().ToString() << dbPath);
+ }
+
switch (settings.Format_) {
case EOutputFormat::Default:
case EOutputFormat::Csv:
@@ -245,43 +266,73 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
case EOutputFormat::JsonBase64:
case EOutputFormat::Parquet:
break;
- default:
+ default:
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
}
+ bool isStdoutInteractive = IsStdoutInteractive();
+ size_t filePathsSize = filePaths.size();
+ std::mutex progressWriteLock;
+ std::atomic<ui64> globalProgress{0};
+
+ auto writeProgress = [&]() {
+ ui64 globalProgressValue = globalProgress.load();
+ std::lock_guard<std::mutex> lock(progressWriteLock);
+ Cout << "Progress " << (globalProgressValue / filePathsSize) << '%' << "\r";
+ Cout.Flush();
+ };
+
+ auto start = TInstant::Now();
+
+ auto pool = CreateThreadPool(filePathsSize);
+ TVector<NThreading::TFuture<TStatus>> asyncResults;
+
+ // If the single empty filename passed, read from stdin, else from the file
+
for (const auto& filePath : filePaths) {
- auto func = [this, &filePath, &dbPath, &settings] {
+ auto func = [&, this] {
+ std::unique_ptr<TFileInput> fileInput;
+ std::optional<ui64> fileSizeHint;
+
if (!filePath.empty()) {
const TFsPath dataFile(filePath);
+
if (!dataFile.Exists()) {
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "File does not exist: " << filePath);
}
+
if (!dataFile.IsFile()) {
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "Not a file: " << filePath);
}
- }
- if (settings.Format_ == EOutputFormat::Tsv) {
- if (settings.Delimiter_ != "\t") {
- return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
+ TFile file(filePath, OpenExisting | RdOnly | Seq);
+ i64 fileLength = file.GetLength();
+ if (fileLength && fileLength >= 0) {
+ fileSizeHint = fileLength;
}
- }
- auto result = NDump::DescribePath(*SchemeClient, dbPath);
- auto resultStatus = result.GetStatus();
+ fileInput = std::make_unique<TFileInput>(file, settings.FileBufferSize_);
+ }
- if (resultStatus != EStatus::SUCCESS) {
- return MakeStatus(EStatus::SCHEME_ERROR,
- TStringBuilder() << result.GetIssues().ToString() << dbPath);
+ ProgressCallbackFunc progressCallback;
+
+ if (isStdoutInteractive)
+ {
+ ui64 oldProgress = 0;
+ progressCallback = [&, oldProgress](ui64 current, ui64 total) mutable {
+ ui64 progress = static_cast<ui64>((static_cast<double>(current) / total) * 100.0);
+ ui64 progressDiff = progress - oldProgress;
+ globalProgress.fetch_add(progressDiff);
+ oldProgress = progress;
+ writeProgress();
+ };
}
- std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr
- : std::make_unique<TFileInput>(filePath, settings.FileBufferSize_);
IInputStream& input = fileInput ? *fileInput : Cin;
+
switch (settings.Format_) {
case EOutputFormat::Default:
case EOutputFormat::Csv:
@@ -289,16 +340,17 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
if (settings.NewlineDelimited_) {
return UpsertCsvByBlocks(filePath, dbPath, settings);
} else {
- return UpsertCsv(input, dbPath, settings);
+ return UpsertCsv(input, dbPath, settings, fileSizeHint, progressCallback);
}
case EOutputFormat::Json:
case EOutputFormat::JsonUnicode:
case EOutputFormat::JsonBase64:
- return UpsertJson(input, dbPath, settings);
+ return UpsertJson(input, dbPath, settings, fileSizeHint, progressCallback);
case EOutputFormat::Parquet:
- return UpsertParquet(filePath, dbPath, settings);
+ return UpsertParquet(filePath, dbPath, settings, progressCallback);
default: ;
}
+
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
};
@@ -313,6 +365,11 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
return result;
}
}
+
+ auto finish = TInstant::Now();
+ auto duration = finish - start;
+ Cout << "Elapsed: " << duration.SecondsFloat() << " sec\n";
+
return MakeStatus(EStatus::SUCCESS);
}
@@ -353,12 +410,12 @@ void TImportFileClient::SetupUpsertSettingsCsv(const TImportFileSettings& settin
}
}
-TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
- const TImportFileSettings& settings) {
+TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
+ std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
TString buffer;
TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
-
+
TCountingInput countInput(&input);
NCsvFormat::TLinesSplitter splitter(countInput);
TString headerRow;
@@ -383,35 +440,58 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
splitter.ConsumeLine();
}
- std::vector<TAsyncStatus> inFlightRequests;
+ THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
- ui32 idx = settings.SkipRows_;
- ui64 readSize = 0;
+ ui32 row = settings.SkipRows_;
ui64 nextBorder = VerboseModeReadSize;
+ ui64 batchBytes = 0;
+ ui64 readBytes = 0;
+
+ TString line;
+ std::vector<TAsyncStatus> inFlightRequests;
+
while (TString line = splitter.ConsumeLine()) {
- readSize += line.size();
+ ++row;
+ readBytes += line.Size();
+ batchBytes += line.Size();
+
if (RemoveLastDelimiter) {
if (!line.EndsWith(settings.Delimiter_)) {
- return MakeStatus(EStatus::BAD_REQUEST,
+ return MakeStatus(EStatus::BAD_REQUEST,
"According to the header, lines should end with a delimiter");
}
line.erase(line.Size() - settings.Delimiter_.Size());
}
+
buffer += line;
buffer += '\n';
- ++idx;
- if (readSize >= nextBorder && RetrySettings.Verbose_) {
+
+ if (readBytes >= nextBorder && RetrySettings.Verbose_) {
nextBorder += VerboseModeReadSize;
- Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl;
+ Cerr << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << row << " records" << Endl;
}
- if (buffer.Size() >= settings.BytesPerRequest_) {
- auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
- if (!status.IsSuccess()) {
- return status;
- }
- inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
- buffer = headerRow;
+ if (batchBytes < settings.BytesPerRequest_) {
+ continue;
+ }
+
+ if (inputSizeHint && progressCallback) {
+ progressCallback(readBytes, *inputSizeHint);
+ }
+
+ auto asyncUpsertCSV = [&, buffer = std::move(buffer)]() {
+ auto value = UpsertCsvBuffer(dbPath, buffer);
+ return value.ExtractValueSync();
+ };
+
+ batchBytes = 0;
+ buffer = {};
+
+ inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool));
+
+ auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
+ if (!status.IsSuccess()) {
+ return status;
}
}
@@ -455,7 +535,7 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr
readSize += line.size();
if (RemoveLastDelimiter) {
if (!line.EndsWith(settings.Delimiter_)) {
- return MakeStatus(EStatus::BAD_REQUEST,
+ return MakeStatus(EStatus::BAD_REQUEST,
"According to the header, lines should end with a delimiter");
}
line.erase(line.Size() - settings.Delimiter_.Size());
@@ -500,9 +580,8 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr
inline
TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) {
auto upsert = [this, dbPath, rows = builder.Build()]
- (NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
- NYdb::TValue r = rows;
- return tableClient.BulkUpsert(dbPath, std::move(r), UpsertSettings)
+ (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
+ return tableClient.BulkUpsert(dbPath, std::move(rows), UpsertSettings)
.Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
NYdb::TStatus status = bulkUpsertResult.GetValueSync();
return NThreading::MakeFuture(status);
@@ -511,54 +590,76 @@ TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBu
return TableClient->RetryOperation(upsert, RetrySettings);
}
-TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath,
- const TImportFileSettings& settings) {
+
+TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
+ std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
- if (! sessionResult.IsSuccess())
+ if (!sessionResult.IsSuccess())
return sessionResult;
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
- if (! tableResult.IsSuccess())
+ if (!tableResult.IsSuccess())
return tableResult;
const TType tableType = GetTableType(tableResult.GetTableDescription());
const NYdb::EBinaryStringEncoding stringEncoding =
- (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
+ (settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
NYdb::EBinaryStringEncoding::Unicode;
- std::vector<TAsyncStatus> inFlightRequests;
TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
+ THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
- size_t currentSize = 0;
- size_t currentRows = 0;
- auto currentBatch = std::make_unique<TValueBuilder>();
- currentBatch->BeginList();
+ ui64 readBytes = 0;
+ ui64 batchBytes = 0;
TString line;
- while (size_t sz = input.ReadLine(line)) {
- currentBatch->AddListItem(JsonToYdbValue(line, tableType, stringEncoding));
- currentSize += line.Size();
- currentRows += 1;
+ std::vector<TString> batchLines;
+ std::vector<TAsyncStatus> inFlightRequests;
- if (currentSize >= settings.BytesPerRequest_) {
- currentBatch->EndList();
+ auto upsertJson = [&](std::vector<TString> batchLines) {
+ TValueBuilder batch;
+ batch.BeginList();
- auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
- if (!status.IsSuccess()) {
- return status;
- }
+ for (auto &line : batchLines) {
+ batch.AddListItem(JsonToYdbValue(line, tableType, stringEncoding));
+ }
+
+ batch.EndList();
+
+ auto value = UpsertJsonBuffer(dbPath, batch);
+ return value.ExtractValueSync();
+ };
- inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch));
+ while (size_t size = input.ReadLine(line)) {
+ batchLines.push_back(line);
+ batchBytes += size;
+ readBytes += size;
- currentBatch = std::make_unique<TValueBuilder>();
- currentBatch->BeginList();
- currentSize = 0;
- currentRows = 0;
+ if (inputSizeHint && progressCallback) {
+ progressCallback(readBytes, *inputSizeHint);
+ }
+
+ if (batchBytes < settings.BytesPerRequest_) {
+ continue;
+ }
+
+ batchBytes = 0;
+
+ auto asyncUpsertJson = [&, batchLines = std::move(batchLines)]() {
+ return upsertJson(batchLines);
+ };
+
+ batchLines.clear();
+
+ inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *pool));
+
+ auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
+ if (!status.IsSuccess()) {
+ return status;
}
}
- if (currentRows > 0) {
- currentBatch->EndList();
- inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch));
+ if (!batchLines.empty()) {
+ upsertJson(std::move(batchLines));
}
return WaitForQueue(0, inFlightRequests);
@@ -566,7 +667,8 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename,
[[maybe_unused]] const TString& dbPath,
- [[maybe_unused]] const TImportFileSettings& settings) {
+ [[maybe_unused]] const TImportFileSettings& settings,
+ [[maybe_unused]] ProgressCallbackFunc & progressCallback) {
#if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__)
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
#else
@@ -575,7 +677,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
if (!fileResult.ok()) {
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unable to open parquet file:" << fileResult.status().ToString());
}
- std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie();
+ std::shared_ptr<arrow::io::ReadableFile> readableFile = *fileResult;
std::unique_ptr<parquet::arrow::FileReader> fileReader;
@@ -585,7 +687,9 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString());
}
- const i64 numRowGroups = parquet::ReadMetaData(readableFile)->num_row_groups();
+ auto metadata = parquet::ReadMetaData(readableFile);
+ const i64 numRows = metadata->num_rows();
+ const i64 numRowGroups = metadata->num_row_groups();
std::vector<int> row_group_indices(numRowGroups);
for (i64 i = 0; i < numRowGroups; i++) {
@@ -599,8 +703,18 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString());
}
+ THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
+
+ std::atomic<ui64> uploadedRows = 0;
+ auto uploadedRowsCallback = [&](ui64 rows) {
+ ui64 uploadedRowsValue = uploadedRows.fetch_add(rows);
+
+ if (progressCallback) {
+ progressCallback(uploadedRowsValue + rows, numRows);
+ }
+ };
+
std::vector<TAsyncStatus> inFlightRequests;
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
while (true) {
std::shared_ptr<arrow::RecordBatch> batch;
@@ -609,49 +723,63 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
if (!st.ok()) {
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
}
+
// The read function will return null at the end of data stream.
if (!batch) {
break;
}
- const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema());
- const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch);
- const size_t sliceCount =
- (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0);
- const i64 rowsInSlice = batch->num_rows() / sliceCount;
+ auto upsertParquetBatch = [&, batch = std::move(batch)]() {
+ const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema());
+ const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch);
+ const size_t sliceCount =
+ (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0);
+ const i64 rowsInSlice = batch->num_rows() / sliceCount;
- for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) {
- std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSend;
+ for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) {
+ std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSendBatches;
- if (currentRow + rowsInSlice < batch->num_rows()) {
- rowsToSend.push(batch->Slice(currentRow, rowsInSlice));
- } else {
- rowsToSend.push(batch->Slice(currentRow));
- }
+ if (currentRow + rowsInSlice < batch->num_rows()) {
+ rowsToSendBatches.push(batch->Slice(currentRow, rowsInSlice));
+ } else {
+ rowsToSendBatches.push(batch->Slice(currentRow));
+ }
- do {
- const auto rows = rowsToSend.top();
+ do {
+ auto rowsBatch = std::move(rowsToSendBatches.top());
+ rowsToSendBatches.pop();
- rowsToSend.pop();
- // Nothing to send. Continue.
- if (rows->num_rows() == 0) {
- continue;
- }
- // Logarithmic approach to find number of rows fit into the byte limit.
- if (rows->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rows) < settings.BytesPerRequest_) {
- // Single row or fits into the byte limit.
- auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
- if (!status.IsSuccess()) {
- return status;
+ // Nothing to send. Continue.
+ if (rowsBatch->num_rows() == 0) {
+ continue;
}
- inFlightRequests.push_back(UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rows), strSchema));
- } else {
- // Split current slice.
- rowsToSend.push(rows->Slice(rows->num_rows() / 2));
- rowsToSend.push(rows->Slice(0, rows->num_rows() / 2));
- }
- } while (!rowsToSend.empty());
+ // Logarithmic approach to find number of rows fit into the byte limit.
+ if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < settings.BytesPerRequest_) {
+ // Single row or fits into the byte limit.
+ auto value = UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rowsBatch), strSchema);
+ auto status = value.ExtractValueSync();
+ if (!status.IsSuccess())
+ return status;
+
+ uploadedRowsCallback(rowsBatch->num_rows());
+ } else {
+ // Split current slice.
+ i64 halfLen = rowsBatch->num_rows() / 2;
+ rowsToSendBatches.push(rowsBatch->Slice(halfLen));
+ rowsToSendBatches.push(rowsBatch->Slice(0, halfLen));
+ }
+ } while (!rowsToSendBatches.empty());
+ };
+
+ return MakeStatus();
+ };
+
+ inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *pool));
+
+ auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
+ if (!status.IsSuccess()) {
+ return status;
}
}
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index 262602e29b5..a45ac02dc30 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -1,5 +1,8 @@
#pragma once
+#include <thread>
+#include <functional>
+
#include <ydb/public/lib/ydb_cli/common/command.h>
#include <ydb/public/lib/ydb_cli/common/formats.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
@@ -41,6 +44,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB);
FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB);
FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100);
+ FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency());
// Settings below are for CSV format only
FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0);
FLUENT_SETTING_DEFAULT(bool, Header, false);
@@ -73,16 +77,21 @@ private:
static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB
+ using ProgressCallbackFunc = std::function<void (ui64, ui64)>;
+
void SetupUpsertSettingsCsv(const TImportFileSettings& settings);
- TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
+ TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
+ std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);
TStatus UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings);
TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer);
- TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
+ TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings,
+ std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);
TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder);
TType GetTableType(const NTable::TTableDescription& tableDescription);
- TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings);
+ TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings,
+ ProgressCallbackFunc & progressCallback);
TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);
};