diff options
author | Maksim Kita <[email protected]> | 2023-05-26 15:31:36 +0000 |
---|---|---|
committer | maksim-kita <[email protected]> | 2023-05-26 18:31:36 +0300 |
commit | 6e44fe97c0f37d89e2944af60916b507b0e9a27b (patch) | |
tree | d5944f3d499eb98169270dfd7c3bc637a7d0bab0 | |
parent | 0fc8c71685d1d603d420ff40bcf1b0916940c99f (diff) |
Improve performance of import
Improve performance of import
Pull Request resolved: #189
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp | 15 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.h | 3 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 344 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 15 |
4 files changed, 260 insertions, 117 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 05faf20f6c4..d9e2ac6041a 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -168,11 +168,10 @@ TCommandImportFromFile::TCommandImportFromFile() void TCommandImportFileBase::Config(TConfig& config) { TYdbCommand::Config(config); - config.Opts->SetTrailingArgTitle("<input files...>", + config.Opts->SetTrailingArgTitle("<input files...>", "One or more file paths to import from"); config.Opts->AddLongOption('p', "path", "Database path to table") .Required().RequiredArgument("STRING").StoreResult(&Path); - config.Opts->AddLongOption('i', "input-file").AppendTo(&FilePaths).Hidden(); const TImportFileSettings defaults; @@ -182,6 +181,9 @@ void TCommandImportFileBase::Config(TConfig& config) { config.Opts->AddLongOption("max-in-flight", "Maximum number of in-flight requests; increase to load big files faster (more memory needed)") .DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests); + config.Opts->AddLongOption("threads", + "Maximum number of threads; number of available processors if not specified") + .DefaultValue(defaults.Threads_).StoreResult(&Threads); } void TCommandImportFileBase::Parse(TConfig& config) { @@ -226,7 +228,7 @@ void TCommandImportFromCsv::Config(TConfig& config) { config.Opts->AddLongOption("header", "Set if file contains column names at the first not skipped row") .StoreTrue(&Header); - config.Opts->AddLongOption("columns", + config.Opts->AddLongOption("columns", "String with column names that replaces header") .RequiredArgument("STR").StoreResult(&HeaderRow); config.Opts->AddLongOption("newline-delimited", @@ -246,6 +248,7 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.Format(InputFormat); settings.MaxInFlightRequests(MaxInFlightRequests); settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + settings.Threads(Threads); settings.SkipRows(SkipRows); settings.Header(Header); settings.NewlineDelimited(NewlineDelimited); @@ -288,6 +291,7 @@ int TCommandImportFromJson::Run(TConfig& config) { settings.Format(InputFormat); settings.MaxInFlightRequests(MaxInFlightRequests); settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + settings.Threads(Threads); TImportFileClient client(CreateDriver(config), config); ThrowOnError(client.Import(FilePaths, Path, settings)); @@ -295,16 +299,17 @@ int TCommandImportFromJson::Run(TConfig& config) { return EXIT_SUCCESS; } -/// Import Parquet +/// Import Parquet void TCommandImportFromParquet::Config(TConfig& config) { TCommandImportFileBase::Config(config); } int TCommandImportFromParquet::Run(TConfig& config) { - TImportFileSettings settings; + TImportFileSettings settings; settings.Format(InputFormat); settings.MaxInFlightRequests(MaxInFlightRequests); settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + settings.Threads(Threads); TImportFileClient client(CreateDriver(config), config); ThrowOnError(client.Import(FilePaths, Path, settings)); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h index 450688a9f8d..501a851eaf1 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h @@ -61,6 +61,7 @@ protected: TVector<TString> FilePaths; TString BytesPerRequest; ui64 MaxInFlightRequests = 1; + ui64 Threads = 0; }; class TCommandImportFromCsv : public TCommandImportFileBase { @@ -108,7 +109,7 @@ public: class TCommandImportFromParquet : public TCommandImportFileBase { public: TCommandImportFromParquet(const TString& cmd = "parquet", const TString& cmdDescription = "Import data from Parquet file") - : TCommandImportFileBase(cmd, cmdDescription) + : TCommandImportFileBase(cmd, cmdDescription) { InputFormat = EOutputFormat::Parquet; } diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index b337930a88f..44e9c8d572a 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -21,6 +21,7 @@ #include <util/stream/file.h> #include <util/stream/length.h> #include <util/string/builder.h> +#include <util/system/thread.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h> @@ -34,6 +35,7 @@ #if defined(_win32_) #include <windows.h> +#include <io.h> #elif defined(_unix_) #include <unistd.h> #endif @@ -82,9 +84,18 @@ FHANDLE GetStdinFileno() { #endif } +bool IsStdoutInteractive() { +#if defined(_win32_) + return _isatty(_fileno(stdout)); +#elif defined(_unix_) + return isatty(fileno(stdout)); +#endif + return true; +} + class TMaxInflightGetter { public: - TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount) + TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount) : TotalMaxInFlight(totalMaxInFlight) , FilesCount(filesCount) { } @@ -134,7 +145,7 @@ private: }; public: - TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow, + TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow, TMaxInflightGetter& inFlightGetter) { TFile file; if (filePath) { @@ -232,8 +243,18 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) { FilesCount = filePaths.size(); - auto pool = CreateThreadPool(FilesCount); - TVector<NThreading::TFuture<TStatus>> asyncResults; + if (settings.Format_ == EOutputFormat::Tsv && settings.Delimiter_ != "\t") { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); + } + + auto result = NDump::DescribePath(*SchemeClient, dbPath); + auto resultStatus = result.GetStatus(); + if (resultStatus != EStatus::SUCCESS) { + return MakeStatus(EStatus::SCHEME_ERROR, + TStringBuilder() << result.GetIssues().ToString() << dbPath); + } + switch (settings.Format_) { case EOutputFormat::Default: case EOutputFormat::Csv: @@ -245,43 +266,73 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri case EOutputFormat::JsonBase64: case EOutputFormat::Parquet: break; - default: + default: return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unsupported format #" << (int) settings.Format_); } + bool isStdoutInteractive = IsStdoutInteractive(); + size_t filePathsSize = filePaths.size(); + std::mutex progressWriteLock; + std::atomic<ui64> globalProgress{0}; + + auto writeProgress = [&]() { + ui64 globalProgressValue = globalProgress.load(); + std::lock_guard<std::mutex> lock(progressWriteLock); + Cout << "Progress " << (globalProgressValue / filePathsSize) << '%' << "\r"; + Cout.Flush(); + }; + + auto start = TInstant::Now(); + + auto pool = CreateThreadPool(filePathsSize); + TVector<NThreading::TFuture<TStatus>> asyncResults; + + // If the single empty filename passed, read from stdin, else from the file + for (const auto& filePath : filePaths) { - auto func = [this, &filePath, &dbPath, &settings] { + auto func = [&, this] { + std::unique_ptr<TFileInput> fileInput; + std::optional<ui64> fileSizeHint; + if (!filePath.empty()) { const TFsPath dataFile(filePath); + if (!dataFile.Exists()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "File does not exist: " << filePath); } + if (!dataFile.IsFile()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not a file: " << filePath); } - } - if (settings.Format_ == EOutputFormat::Tsv) { - if (settings.Delimiter_ != "\t") { - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); + TFile file(filePath, OpenExisting | RdOnly | Seq); + i64 fileLength = file.GetLength(); + if (fileLength && fileLength >= 0) { + fileSizeHint = fileLength; } - } - auto result = NDump::DescribePath(*SchemeClient, dbPath); - auto resultStatus = result.GetStatus(); + fileInput = std::make_unique<TFileInput>(file, settings.FileBufferSize_); + } - if (resultStatus != EStatus::SUCCESS) { - return MakeStatus(EStatus::SCHEME_ERROR, - TStringBuilder() << result.GetIssues().ToString() << dbPath); + ProgressCallbackFunc progressCallback; + + if (isStdoutInteractive) + { + ui64 oldProgress = 0; + progressCallback = [&, oldProgress](ui64 current, ui64 total) mutable { + ui64 progress = static_cast<ui64>((static_cast<double>(current) / total) * 100.0); + ui64 progressDiff = progress - oldProgress; + globalProgress.fetch_add(progressDiff); + oldProgress = progress; + writeProgress(); + }; } - std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr - : std::make_unique<TFileInput>(filePath, settings.FileBufferSize_); IInputStream& input = fileInput ? *fileInput : Cin; + switch (settings.Format_) { case EOutputFormat::Default: case EOutputFormat::Csv: @@ -289,16 +340,17 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri if (settings.NewlineDelimited_) { return UpsertCsvByBlocks(filePath, dbPath, settings); } else { - return UpsertCsv(input, dbPath, settings); + return UpsertCsv(input, dbPath, settings, fileSizeHint, progressCallback); } case EOutputFormat::Json: case EOutputFormat::JsonUnicode: case EOutputFormat::JsonBase64: - return UpsertJson(input, dbPath, settings); + return UpsertJson(input, dbPath, settings, fileSizeHint, progressCallback); case EOutputFormat::Parquet: - return UpsertParquet(filePath, dbPath, settings); + return UpsertParquet(filePath, dbPath, settings, progressCallback); default: ; } + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unsupported format #" << (int) settings.Format_); }; @@ -313,6 +365,11 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri return result; } } + + auto finish = TInstant::Now(); + auto duration = finish - start; + Cout << "Elapsed: " << duration.SecondsFloat() << " sec\n"; + return MakeStatus(EStatus::SUCCESS); } @@ -353,12 +410,12 @@ void TImportFileClient::SetupUpsertSettingsCsv(const TImportFileSettings& settin } } -TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, - const TImportFileSettings& settings) { +TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, + std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) { TString buffer; TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); - + TCountingInput countInput(&input); NCsvFormat::TLinesSplitter splitter(countInput); TString headerRow; @@ -383,35 +440,58 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, splitter.ConsumeLine(); } - std::vector<TAsyncStatus> inFlightRequests; + THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); - ui32 idx = settings.SkipRows_; - ui64 readSize = 0; + ui32 row = settings.SkipRows_; ui64 nextBorder = VerboseModeReadSize; + ui64 batchBytes = 0; + ui64 readBytes = 0; + + TString line; + std::vector<TAsyncStatus> inFlightRequests; + while (TString line = splitter.ConsumeLine()) { - readSize += line.size(); + ++row; + readBytes += line.Size(); + batchBytes += line.Size(); + if (RemoveLastDelimiter) { if (!line.EndsWith(settings.Delimiter_)) { - return MakeStatus(EStatus::BAD_REQUEST, + return MakeStatus(EStatus::BAD_REQUEST, "According to the header, lines should end with a delimiter"); } line.erase(line.Size() - settings.Delimiter_.Size()); } + buffer += line; buffer += '\n'; - ++idx; - if (readSize >= nextBorder && RetrySettings.Verbose_) { + + if (readBytes >= nextBorder && RetrySettings.Verbose_) { nextBorder += VerboseModeReadSize; - Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; + Cerr << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << row << " records" << Endl; } - if (buffer.Size() >= settings.BytesPerRequest_) { - auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); - if (!status.IsSuccess()) { - return status; - } - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); - buffer = headerRow; + if (batchBytes < settings.BytesPerRequest_) { + continue; + } + + if (inputSizeHint && progressCallback) { + progressCallback(readBytes, *inputSizeHint); + } + + auto asyncUpsertCSV = [&, buffer = std::move(buffer)]() { + auto value = UpsertCsvBuffer(dbPath, buffer); + return value.ExtractValueSync(); + }; + + batchBytes = 0; + buffer = {}; + + inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool)); + + auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); + if (!status.IsSuccess()) { + return status; } } @@ -455,7 +535,7 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr readSize += line.size(); if (RemoveLastDelimiter) { if (!line.EndsWith(settings.Delimiter_)) { - return MakeStatus(EStatus::BAD_REQUEST, + return MakeStatus(EStatus::BAD_REQUEST, "According to the header, lines should end with a delimiter"); } line.erase(line.Size() - settings.Delimiter_.Size()); @@ -500,9 +580,8 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr inline TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) { auto upsert = [this, dbPath, rows = builder.Build()] - (NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { - NYdb::TValue r = rows; - return tableClient.BulkUpsert(dbPath, std::move(r), UpsertSettings) + (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { + return tableClient.BulkUpsert(dbPath, std::move(rows), UpsertSettings) .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { NYdb::TStatus status = bulkUpsertResult.GetValueSync(); return NThreading::MakeFuture(status); @@ -511,54 +590,76 @@ TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBu return TableClient->RetryOperation(upsert, RetrySettings); } -TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, - const TImportFileSettings& settings) { + +TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, + std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) { NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); - if (! sessionResult.IsSuccess()) + if (!sessionResult.IsSuccess()) return sessionResult; NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync(); - if (! tableResult.IsSuccess()) + if (!tableResult.IsSuccess()) return tableResult; const TType tableType = GetTableType(tableResult.GetTableDescription()); const NYdb::EBinaryStringEncoding stringEncoding = - (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : + (settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode; - std::vector<TAsyncStatus> inFlightRequests; TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); + THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); - size_t currentSize = 0; - size_t currentRows = 0; - auto currentBatch = std::make_unique<TValueBuilder>(); - currentBatch->BeginList(); + ui64 readBytes = 0; + ui64 batchBytes = 0; TString line; - while (size_t sz = input.ReadLine(line)) { - currentBatch->AddListItem(JsonToYdbValue(line, tableType, stringEncoding)); - currentSize += line.Size(); - currentRows += 1; + std::vector<TString> batchLines; + std::vector<TAsyncStatus> inFlightRequests; - if (currentSize >= settings.BytesPerRequest_) { - currentBatch->EndList(); + auto upsertJson = [&](std::vector<TString> batchLines) { + TValueBuilder batch; + batch.BeginList(); - auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); - if (!status.IsSuccess()) { - return status; - } + for (auto &line : batchLines) { + batch.AddListItem(JsonToYdbValue(line, tableType, stringEncoding)); + } + + batch.EndList(); + + auto value = UpsertJsonBuffer(dbPath, batch); + return value.ExtractValueSync(); + }; - inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch)); + while (size_t size = input.ReadLine(line)) { + batchLines.push_back(line); + batchBytes += size; + readBytes += size; - currentBatch = std::make_unique<TValueBuilder>(); - currentBatch->BeginList(); - currentSize = 0; - currentRows = 0; + if (inputSizeHint && progressCallback) { + progressCallback(readBytes, *inputSizeHint); + } + + if (batchBytes < settings.BytesPerRequest_) { + continue; + } + + batchBytes = 0; + + auto asyncUpsertJson = [&, batchLines = std::move(batchLines)]() { + return upsertJson(batchLines); + }; + + batchLines.clear(); + + inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *pool)); + + auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); + if (!status.IsSuccess()) { + return status; } } - if (currentRows > 0) { - currentBatch->EndList(); - inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch)); + if (!batchLines.empty()) { + upsertJson(std::move(batchLines)); } return WaitForQueue(0, inFlightRequests); @@ -566,7 +667,8 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename, [[maybe_unused]] const TString& dbPath, - [[maybe_unused]] const TImportFileSettings& settings) { + [[maybe_unused]] const TImportFileSettings& settings, + [[maybe_unused]] ProgressCallbackFunc & progressCallback) { #if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__) return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); #else @@ -575,7 +677,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam if (!fileResult.ok()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unable to open parquet file:" << fileResult.status().ToString()); } - std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie(); + std::shared_ptr<arrow::io::ReadableFile> readableFile = *fileResult; std::unique_ptr<parquet::arrow::FileReader> fileReader; @@ -585,7 +687,9 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString()); } - const i64 numRowGroups = parquet::ReadMetaData(readableFile)->num_row_groups(); + auto metadata = parquet::ReadMetaData(readableFile); + const i64 numRows = metadata->num_rows(); + const i64 numRowGroups = metadata->num_row_groups(); std::vector<int> row_group_indices(numRowGroups); for (i64 i = 0; i < numRowGroups; i++) { @@ -599,8 +703,18 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString()); } + THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); + + std::atomic<ui64> uploadedRows = 0; + auto uploadedRowsCallback = [&](ui64 rows) { + ui64 uploadedRowsValue = uploadedRows.fetch_add(rows); + + if (progressCallback) { + progressCallback(uploadedRowsValue + rows, numRows); + } + }; + std::vector<TAsyncStatus> inFlightRequests; - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); while (true) { std::shared_ptr<arrow::RecordBatch> batch; @@ -609,49 +723,63 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam if (!st.ok()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); } + // The read function will return null at the end of data stream. if (!batch) { break; } - const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()); - const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch); - const size_t sliceCount = - (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0); - const i64 rowsInSlice = batch->num_rows() / sliceCount; + auto upsertParquetBatch = [&, batch = std::move(batch)]() { + const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()); + const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch); + const size_t sliceCount = + (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0); + const i64 rowsInSlice = batch->num_rows() / sliceCount; - for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) { - std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSend; + for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) { + std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSendBatches; - if (currentRow + rowsInSlice < batch->num_rows()) { - rowsToSend.push(batch->Slice(currentRow, rowsInSlice)); - } else { - rowsToSend.push(batch->Slice(currentRow)); - } + if (currentRow + rowsInSlice < batch->num_rows()) { + rowsToSendBatches.push(batch->Slice(currentRow, rowsInSlice)); + } else { + rowsToSendBatches.push(batch->Slice(currentRow)); + } - do { - const auto rows = rowsToSend.top(); + do { + auto rowsBatch = std::move(rowsToSendBatches.top()); + rowsToSendBatches.pop(); - rowsToSend.pop(); - // Nothing to send. Continue. - if (rows->num_rows() == 0) { - continue; - } - // Logarithmic approach to find number of rows fit into the byte limit. - if (rows->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rows) < settings.BytesPerRequest_) { - // Single row or fits into the byte limit. - auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); - if (!status.IsSuccess()) { - return status; + // Nothing to send. Continue. + if (rowsBatch->num_rows() == 0) { + continue; } - inFlightRequests.push_back(UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rows), strSchema)); - } else { - // Split current slice. - rowsToSend.push(rows->Slice(rows->num_rows() / 2)); - rowsToSend.push(rows->Slice(0, rows->num_rows() / 2)); - } - } while (!rowsToSend.empty()); + // Logarithmic approach to find number of rows fit into the byte limit. + if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < settings.BytesPerRequest_) { + // Single row or fits into the byte limit. + auto value = UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rowsBatch), strSchema); + auto status = value.ExtractValueSync(); + if (!status.IsSuccess()) + return status; + + uploadedRowsCallback(rowsBatch->num_rows()); + } else { + // Split current slice. + i64 halfLen = rowsBatch->num_rows() / 2; + rowsToSendBatches.push(rowsBatch->Slice(halfLen)); + rowsToSendBatches.push(rowsBatch->Slice(0, halfLen)); + } + } while (!rowsToSendBatches.empty()); + }; + + return MakeStatus(); + }; + + inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *pool)); + + auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests); + if (!status.IsSuccess()) { + return status; } } diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 262602e29b5..a45ac02dc30 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -1,5 +1,8 @@ #pragma once +#include <thread> +#include <functional> + #include <ydb/public/lib/ydb_cli/common/command.h> #include <ydb/public/lib/ydb_cli/common/formats.h> #include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> @@ -41,6 +44,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB); FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB); FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100); + FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency()); // Settings below are for CSV format only FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0); FLUENT_SETTING_DEFAULT(bool, Header, false); @@ -73,16 +77,21 @@ private: static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB + using ProgressCallbackFunc = std::function<void (ui64, ui64)>; + void SetupUpsertSettingsCsv(const TImportFileSettings& settings); - TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); + TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, + std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback); TStatus UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings); TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer); - TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); + TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings, + std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback); TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder); TType GetTableType(const NTable::TTableDescription& tableDescription); - TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings); + TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings, + ProgressCallbackFunc & progressCallback); TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema); }; |