diff options
author | Nikolay Perfilov <pnv1@yandex-team.ru> | 2024-11-22 13:39:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-22 13:39:40 +0300 |
commit | b13bd4af6109e62064aef918bbf593cc549cafa6 (patch) | |
tree | e7c6af44ca297c244b561f46f1cf7a8c19e47f56 | |
parent | a18f18d81996ca8e681bb6cabd441b52833d99bf (diff) | |
download | ydb-b13bd4af6109e62064aef918bbf593cc549cafa6.tar.gz |
Improve csv import, step 3 (#11864)
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp | 12 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser.cpp | 4 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 478 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 51 |
4 files changed, 367 insertions, 178 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 23ab8e32ea..8ed4aaa660 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -311,8 +311,8 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.Delimiter(Delimiter); } - TImportFileClient client(CreateDriver(config), config); - ThrowOnError(client.Import(FilePaths, Path, settings)); + TImportFileClient client(CreateDriver(config), config, settings); + ThrowOnError(client.Import(FilePaths, Path)); return EXIT_SUCCESS; } @@ -341,8 +341,8 @@ int TCommandImportFromJson::Run(TConfig& config) { settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); settings.Threads(Threads); - TImportFileClient client(CreateDriver(config), config); - ThrowOnError(client.Import(FilePaths, Path, settings)); + TImportFileClient client(CreateDriver(config), config, settings); + ThrowOnError(client.Import(FilePaths, Path)); return EXIT_SUCCESS; } @@ -360,8 +360,8 @@ int TCommandImportFromParquet::Run(TConfig& config) { settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); settings.Threads(Threads); - TImportFileClient client(CreateDriver(config), config); - ThrowOnError(client.Import(FilePaths, Path, settings)); + TImportFileClient client(CreateDriver(config), config, settings); + ThrowOnError(client.Import(FilePaths, Path)); return EXIT_SUCCESS; } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp index b3d50636b7..aff6db75f7 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp @@ -483,8 +483,8 @@ void TCsvParser::BuildLineType() { auto findIt = DestinationTypes->find(colName); if (findIt != DestinationTypes->end()) { builder.AddMember(colName, findIt->second); - ResultLineTypesSorted.emplace_back(&findIt->second); - ResultLineNamesSorted.emplace_back(&colName); + ResultLineTypesSorted.push_back(&findIt->second); + ResultLineNamesSorted.push_back(&colName); SkipBitMap.push_back(false); ++ResultColumnCount; } else { diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index ddfd3ccb61..0e417c3132 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -149,7 +149,7 @@ private: }; class TCsvFileReader { -private: +public: class TFileChunk { public: TFileChunk(TFile file, THolder<IInputStream>&& stream, ui64 size = std::numeric_limits<ui64>::max()) @@ -180,8 +180,7 @@ private: }; public: - TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow, - TMaxInflightGetter& inFlightGetter) { + TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow, ui64 maxThreads) { TFile file; if (filePath) { file = TFile(filePath, RdOnly); @@ -199,7 +198,6 @@ public: } i64 skipSize = countInput.Counter(); - MaxInFlight = inFlightGetter.GetCurrentMaxInflight(); i64 fileSize = file.GetLength(); if (filePath.empty() || fileSize == -1) { SplitCount = 1; @@ -207,7 +205,7 @@ public: return; } - SplitCount = Min(MaxInFlight, (fileSize - skipSize) / settings.BytesPerRequest_ + 1); + SplitCount = Min(maxThreads, (fileSize - skipSize) / settings.BytesPerRequest_ + 1); i64 chunkSize = (fileSize - skipSize) / SplitCount; if (chunkSize == 0) { SplitCount = 1; @@ -260,21 +258,148 @@ private: size_t MaxInFlight; }; +class TJobInFlightManager { +public: + TJobInFlightManager(size_t orderNum, size_t fileCount, size_t maxJobInFlight) + : MaxJobInFlight(maxJobInFlight) + , CurrentFileCount(fileCount) + , CurrentSemaphoreMax(GetSemaphoreMaxValue(orderNum)) + , JobsSemaphore(GetSemaphoreMaxValue(orderNum)) { + } + + // Return value: true if this call was useful (Inflight manager is still working) + // Parameter informedSoFar: number of other Inflight managers already informed. Needed to calculate newSemaphoreMax + bool OnAnotherFileFinished(size_t informedSoFar) { + std::lock_guard<std::mutex> lock(Mutex); + if (Finished || CurrentFileCount <= 1) { + return false; + } + --CurrentFileCount; + size_t newSemaphoreMax = GetSemaphoreMaxValue(informedSoFar); + size_t semaphoreSizeDiff = newSemaphoreMax - CurrentSemaphoreMax; + CurrentSemaphoreMax = newSemaphoreMax; + if (semaphoreSizeDiff > 0) { + JobsSemaphore.release(semaphoreSizeDiff); + } + return true; + } + + void AcquireJob() { + JobsSemaphore.acquire(); + } + + void ReleaseJob() { + JobsSemaphore.release(); + } + + void Finish() { + std::lock_guard<std::mutex> lock(Mutex); + Finished = true; + } + + void WaitForAllJobs() { + std::lock_guard<std::mutex> lock(Mutex); + for (size_t i = 0; i < CurrentSemaphoreMax; ++i) { + JobsSemaphore.acquire(); + } + } + +private: + size_t GetSemaphoreMaxValue(size_t orderNum) const { + return Max(1ul, MaxJobInFlight / CurrentFileCount + // One more thread for the first <MaxJobInFlight % CurrentFileCount> managers to utilize max jobs inflight + + (orderNum < MaxJobInFlight % CurrentFileCount ? 1 : 0)); + } + size_t MaxJobInFlight; + size_t CurrentFileCount; + size_t CurrentSemaphoreMax; + bool Finished = false; + std::mutex Mutex; + // Jobs starts on launching a worker in a separate thread that builds TValue and sends request + // Job ends on receiving final request (after all retries) + std::counting_semaphore<> JobsSemaphore; +}; // TJobInflightManager + } // namespace -TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig) +class TImportFileClient::TImpl { +public: + explicit TImpl(const TDriver& driver, const TClientCommand::TConfig& rootConfig, + const TImportFileSettings& settings); + TStatus Import(const TVector<TString>& filePaths, const TString& dbPath); + +private: + using ProgressCallbackFunc = std::function<void (ui64, ui64)>; + + TStatus UpsertCsv(IInputStream& input, + const TString& dbPath, + const TString& filePath, + std::optional<ui64> inputSizeHint, + ProgressCallbackFunc & progressCallback, + std::shared_ptr<TJobInFlightManager> jobInflightManager); + + TStatus UpsertCsvByBlocks(const TString& filePath, + const TString& dbPath); + TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder); + TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows); + TStatus UpsertJson(IInputStream &input, const TString &dbPath, std::optional<ui64> inputSizeHint, + ProgressCallbackFunc & progressCallback); + TStatus UpsertParquet(const TString& filename, const TString& dbPath, ProgressCallbackFunc & progressCallback); + TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema); + TType GetTableType(); + std::map<TString, TType> GetColumnTypes(); + void ValidateTValueUpsertTable(); + + std::shared_ptr<NTable::TTableClient> TableClient; + std::shared_ptr<NScheme::TSchemeClient> SchemeClient; + const TImportFileSettings& Settings; + NTable::TBulkUpsertSettings UpsertSettings; + NTable::TRetryOperationSettings RetrySettings; + std::unique_ptr<const NTable::TTableDescription> DbTableInfo; + std::atomic<ui64> CurrentFileCount; + std::atomic<ui64> TotalBytesRead = 0; + // RequestInflight increases on sending a single request to server + // Decreases on receiving any response for its request + std::unique_ptr<std::counting_semaphore<>> RequestsInflight; + // Common pool between all files for building TValues + THolder<IThreadPool> ProcessingPool; + std::atomic<bool> Failed = false; + std::atomic<bool> InformedAboutLimit = false; + THolder<TStatus> ErrorStatus; + + static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB +}; // TImpl + +TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig, + const TImportFileSettings& settings) + : Impl_(new TImpl(driver, rootConfig, settings)) { +} + +TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath) { + return Impl_->Import(filePaths, dbPath); +} + +TImportFileClient::TImpl::TImpl(const TDriver& driver, const TClientCommand::TConfig& rootConfig, + const TImportFileSettings& settings) : TableClient(std::make_shared<NTable::TTableClient>(driver)) , SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver)) + , Settings(settings) { RetrySettings .MaxRetries(TImportFileSettings::MaxRetries) .Idempotent(true) .Verbose(rootConfig.IsVerbose()); + IThreadPool::TParams poolParams; + if (!Settings.NewlineDelimited_) { + poolParams.SetBlocking(true); + } + ProcessingPool = CreateThreadPool(Settings.Threads_, 0, poolParams); + RequestsInflight = std::make_unique<std::counting_semaphore<>>(Settings.MaxInFlightRequests_); } -TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) { +TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, const TString& dbPath) { CurrentFileCount = filePaths.size(); - if (settings.Format_ == EDataFormat::Tsv && settings.Delimiter_ != "\t") { + if (Settings.Format_ == EDataFormat::Tsv && Settings.Delimiter_ != "\t") { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); } @@ -301,8 +426,8 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri } UpsertSettings - .OperationTimeout(settings.OperationTimeout_) - .ClientTimeout(settings.ClientTimeout_); + .OperationTimeout(Settings.OperationTimeout_) + .ClientTimeout(Settings.ClientTimeout_); bool isStdoutInteractive = IsStdoutInteractive(); size_t filePathsSize = filePaths.size(); @@ -324,8 +449,20 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri // If the single empty filename passed, read from stdin, else from the file - for (const auto& filePath : filePaths) { - auto func = [&, this] { + std::vector<std::shared_ptr<TJobInFlightManager>> inflightManagers; + std::mutex inflightManagersLock; + if ((Settings.Format_ == EDataFormat::Tsv || Settings.Format_ == EDataFormat::Csv) && !Settings.NewlineDelimited_) { + inflightManagers.reserve(filePathsSize); + // <maxInFlight> requests in flight on server and <maxThreads> threads building TValue + size_t maxJobInflight = Settings.Threads_ + Settings.MaxInFlightRequests_; + for (size_t i = 0; i < filePathsSize; ++i) { + inflightManagers.push_back(std::make_shared<TJobInFlightManager>(i, filePathsSize, maxJobInflight)); + } + } + + for (size_t fileOrderNumber = 0; fileOrderNumber < filePathsSize; ++fileOrderNumber) { + const auto& filePath = filePaths[fileOrderNumber]; + auto func = [&, fileOrderNumber, this] { std::unique_ptr<TFileInput> fileInput; std::optional<ui64> fileSizeHint; @@ -348,13 +485,12 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri fileSizeHint = fileLength; } - fileInput = std::make_unique<TFileInput>(file, settings.FileBufferSize_); + fileInput = std::make_unique<TFileInput>(file, Settings.FileBufferSize_); } ProgressCallbackFunc progressCallback; - if (isStdoutInteractive) - { + if (isStdoutInteractive) { ui64 oldProgress = 0; progressCallback = [&, oldProgress](ui64 current, ui64 total) mutable { ui64 progress = static_cast<ui64>((static_cast<double>(current) / total) * 100.0); @@ -367,26 +503,41 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri IInputStream& input = fileInput ? *fileInput : Cin; - switch (settings.Format_) { - case EDataFormat::Default: - case EDataFormat::Csv: - case EDataFormat::Tsv: - if (settings.NewlineDelimited_) { - return UpsertCsvByBlocks(filePath, dbPath, settings); - } else { - return UpsertCsv(input, dbPath, settings, filePath, fileSizeHint, progressCallback); - } - case EDataFormat::Json: - case EDataFormat::JsonUnicode: - case EDataFormat::JsonBase64: - return UpsertJson(input, dbPath, settings, fileSizeHint, progressCallback); - case EDataFormat::Parquet: - return UpsertParquet(filePath, dbPath, settings, progressCallback); - default: ; - } + try { + switch (Settings.Format_) { + case EDataFormat::Default: + case EDataFormat::Csv: + case EDataFormat::Tsv: + if (Settings.NewlineDelimited_) { + return UpsertCsvByBlocks(filePath, dbPath); + } else { + auto status = UpsertCsv(input, dbPath, filePath, fileSizeHint, progressCallback, + inflightManagers.at(fileOrderNumber)); + std::lock_guard<std::mutex> lock(inflightManagersLock); + inflightManagers[fileOrderNumber]->Finish(); + size_t informedManagers = 0; + for (auto& inflightManager : inflightManagers) { + if (inflightManager->OnAnotherFileFinished(informedManagers)) { + ++informedManagers; + } + } + return status; + } + case EDataFormat::Json: + case EDataFormat::JsonUnicode: + case EDataFormat::JsonBase64: + return UpsertJson(input, dbPath, fileSizeHint, progressCallback); + case EDataFormat::Parquet: + return UpsertParquet(filePath, dbPath, progressCallback); + default: ; + } - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Unsupported format #" << (int) settings.Format_); + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Unsupported format #" << (int) Settings.Format_); + } catch (const std::exception& e) { + return MakeStatus(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Error: " << e.what()); + } }; asyncResults.push_back(NThreading::Async(std::move(func), *pool)); @@ -413,8 +564,8 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri } inline -TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) { - auto upsert = [this, dbPath, rows = builder.Build()] +TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) { + auto retryFunc = [this, dbPath, rows = builder.Build()] (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto()); return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings) @@ -423,12 +574,12 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue return NThreading::MakeFuture(status); }); }; - return TableClient->RetryOperation(upsert, RetrySettings); + return TableClient->RetryOperation(retryFunc, RetrySettings); } inline -TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) { - auto upsert = [this, dbPath, rows = std::move(rows)] +TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) { + auto retryFunc = [this, dbPath, rows = std::move(rows)] (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto()); return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings) @@ -437,18 +588,36 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue return NThreading::MakeFuture(status); }); }; - return TableClient->RetryOperation(upsert, RetrySettings); + if (!RequestsInflight->try_acquire()) { + if (Settings.Verbose_ && Settings.NewlineDelimited_) { + if (!InformedAboutLimit.exchange(true)) { + Cerr << (TStringBuilder() << "@ (each '@' means max request inflight is reached and a worker thread is waiting for " + "any response from database)" << Endl); + } else { + Cerr << '@'; + } + } + RequestsInflight->acquire(); + } + return TableClient->RetryOperation(retryFunc, RetrySettings) + .Apply([this](const TAsyncStatus& asyncStatus) { + NYdb::TStatus status = asyncStatus.GetValueSync(); + if (!status.IsSuccess()) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder<TStatus>(status); + } + } + RequestsInflight->release(); + return asyncStatus; + }); } -TStatus TImportFileClient::UpsertCsv(IInputStream& input, +TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input, const TString& dbPath, - const TImportFileSettings& settings, const TString& filePath, std::optional<ui64> inputSizeHint, - ProgressCallbackFunc & progressCallback) { - - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); - + ProgressCallbackFunc & progressCallback, + std::shared_ptr<TJobInFlightManager> jobInflightManager) { TCountingInput countInput(&input); NCsvFormat::TLinesSplitter splitter(countInput); @@ -458,15 +627,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, TCsvParser parser; bool removeLastDelimiter = false; - InitCsvParser(parser, removeLastDelimiter, splitter, settings, &columnTypes, DbTableInfo.get()); + InitCsvParser(parser, removeLastDelimiter, splitter, Settings, &columnTypes, DbTableInfo.get()); - for (ui32 i = 0; i < settings.SkipRows_; ++i) { + for (ui32 i = 0; i < Settings.SkipRows_; ++i) { splitter.ConsumeLine(); } - THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); - - ui64 row = settings.SkipRows_ + settings.Header_ + 1; + ui64 row = Settings.SkipRows_ + Settings.Header_ + 1; ui64 batchRows = 0; ui64 nextBorder = VerboseModeStepSize; ui64 batchBytes = 0; @@ -476,8 +643,19 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, std::vector<TAsyncStatus> inFlightRequests; std::vector<TString> buffer; - auto upsertCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) { - return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row)).ExtractValueSync(); + auto upsertCsvFunc = [&, jobInflightManager](std::vector<TString>&& buffer, ui64 row) { + try { + UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row)) + .Apply([jobInflightManager](const TAsyncStatus& asyncStatus) { + jobInflightManager->ReleaseJob(); + return asyncStatus; + }); + } catch (const std::exception& e) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what())); + } + jobInflightManager->ReleaseJob(); + } }; while (TString line = splitter.ConsumeLine()) { @@ -489,21 +667,21 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, batchBytes += line.size(); if (removeLastDelimiter) { - if (!line.EndsWith(settings.Delimiter_)) { + if (!line.EndsWith(Settings.Delimiter_)) { return MakeStatus(EStatus::BAD_REQUEST, "According to the header, lines should end with a delimiter"); } - line.erase(line.size() - settings.Delimiter_.size()); + line.erase(line.size() - Settings.Delimiter_.size()); } buffer.push_back(line); - if (readBytes >= nextBorder && settings.Verbose_) { + if (readBytes >= nextBorder && Settings.Verbose_) { nextBorder += VerboseModeStepSize; - Cerr << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl; + Cerr << (TStringBuilder() << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl); } - if (batchBytes < settings.BytesPerRequest_) { + if (batchBytes < Settings.BytesPerRequest_) { continue; } @@ -511,49 +689,60 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, progressCallback(readBytes, *inputSizeHint); } - auto asyncUpsertCSV = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable { - return upsertCsvFunc(std::move(buffer), row); + auto workerFunc = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable { + upsertCsvFunc(std::move(buffer), row); }; row += batchRows; batchRows = 0; batchBytes = 0; buffer.clear(); - inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool)); + jobInflightManager->AcquireJob(); - auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); - if (!status.IsSuccess()) { - return status; + if (!ProcessingPool->AddFunc(workerFunc)) { + return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func"); + } + + if (Failed) { + break; } } - if (!buffer.empty() && countInput.Counter() > 0) { + // Send the rest if buffer is not empty + if (!buffer.empty() && countInput.Counter() > 0 && !Failed) { + jobInflightManager->AcquireJob(); upsertCsvFunc(std::move(buffer), row); } + jobInflightManager->WaitForAllJobs(); + TotalBytesRead += readBytes; - auto waitResult = WaitForQueue(0, inFlightRequests); - if (settings.Verbose_) { + if (Settings.Verbose_) { std::stringstream str; double fileProcessingTimeSeconds = (TInstant::Now() - fileStartTime).SecondsFloat(); str << std::endl << "File " << filePath << " of " << PrettifyBytes(readBytes) - << " processed in " << std::setprecision(3) << fileProcessingTimeSeconds << " sec"; + << (Failed ? " failed in " : " processed in ") << std::setprecision(3) << fileProcessingTimeSeconds << " sec"; if (fileProcessingTimeSeconds > 0) { str << ", " << PrettifyBytes((double)readBytes / fileProcessingTimeSeconds) << "/s" << std::endl; } std::cerr << str.str(); } - return waitResult; + if (Failed) { + return *ErrorStatus; + } else { + return MakeStatus(); + } } -TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, - const TString& dbPath, - const TImportFileSettings& settings) { - - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); +TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath, + const TString& dbPath) { TString headerRow; - TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter); + ui64 maxThreads = Max(1ul, Settings.Threads_ / CurrentFileCount); + TCsvFileReader splitter(filePath, Settings, headerRow, maxThreads); + ui64 threadCount = splitter.GetSplitCount(); + // MaxInFlightRequests_ requests in flight on server and threadCount threads building TValue + size_t maxJobInflightTotal = threadCount + Settings.MaxInFlightRequests_; auto columnTypes = GetColumnTypes(); ValidateTValueUpsertTable(); @@ -561,83 +750,119 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, TCsvParser parser; bool removeLastDelimiter = false; TStringInput headerInput(headerRow); - NCsvFormat::TLinesSplitter headerSplitter(headerInput, settings.Delimiter_[0]); - InitCsvParser(parser, removeLastDelimiter, headerSplitter, settings, &columnTypes, DbTableInfo.get()); - - TVector<TAsyncStatus> threadResults(splitter.GetSplitCount()); - THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount()); - for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) { - auto loadCsv = [&, threadId] () { + NCsvFormat::TLinesSplitter headerSplitter(headerInput, Settings.Delimiter_[0]); + InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, &columnTypes, DbTableInfo.get()); + + TVector<NThreading::TFuture<void>> threadResults; + threadResults.reserve(threadCount); + for (size_t threadId = 0; threadId < threadCount; ++threadId) { + auto promise = NThreading::NewPromise<void>(); + threadResults.push_back(promise.GetFuture()); + auto workerFunc = [&, threadId, promise] () mutable { + TInstant threadStartTime = TInstant::Now(); + size_t maxJobInflight = maxJobInflightTotal / threadCount + (threadId < maxJobInflightTotal % threadCount ? 1 : 0); + // Jobs starts on starting building TValue and sends request + // Job ends on receiving final request (after all retries) + std::counting_semaphore<> jobsInflight(maxJobInflight); auto upsertCsvFunc = [&](std::vector<TString>&& buffer) { - return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath)); + jobsInflight.acquire(); + try { + UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath)) + .Apply([&jobsInflight](const TAsyncStatus& asyncStatus) { + jobsInflight.release(); + return asyncStatus; + }); + } catch (const std::exception& e) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what())); + } + jobsInflight.release(); + } }; std::vector<TAsyncStatus> inFlightRequests; std::vector<TString> buffer; - ui32 idx = settings.SkipRows_; + ui32 idx = Settings.SkipRows_; ui64 readBytes = 0; ui64 batchBytes = 0; ui64 nextBorder = VerboseModeStepSize; - TAsyncStatus status; TString line; - while (splitter.GetChunk(threadId).ConsumeLine(line)) { + TCsvFileReader::TFileChunk& chunk = splitter.GetChunk(threadId); + while (chunk.ConsumeLine(line)) { if (line.empty()) { continue; } readBytes += line.size(); batchBytes += line.size(); if (removeLastDelimiter) { - if (!line.EndsWith(settings.Delimiter_)) { - return MakeStatus(EStatus::BAD_REQUEST, - "According to the header, lines should end with a delimiter"); + if (!line.EndsWith(Settings.Delimiter_)) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::BAD_REQUEST, + "According to the header, lines should end with a delimiter")); + } + break; } - line.erase(line.size() - settings.Delimiter_.size()); + line.erase(line.size() - Settings.Delimiter_.size()); } buffer.push_back(line); ++idx; - if (readBytes >= nextBorder && settings.Verbose_) { + if (readBytes >= nextBorder && Settings.Verbose_) { nextBorder += VerboseModeStepSize; - TStringBuilder builder; - builder << "Processed " << PrettifyBytes(readBytes) << " and " << idx << " records" << Endl; - Cerr << builder; + Cerr << (TStringBuilder() << "Processed " << PrettifyBytes(readBytes) << " and " + << idx << " records" << Endl); } - if (batchBytes >= settings.BytesPerRequest_) { + if (batchBytes >= Settings.BytesPerRequest_) { batchBytes = 0; - auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests); - if (!status.IsSuccess()) { - return status; - } - - inFlightRequests.push_back(upsertCsvFunc(std::move(buffer))); + upsertCsvFunc(std::move(buffer)); buffer.clear(); + + if (Failed) { + break; + } } } - if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) { - inFlightRequests.push_back(upsertCsvFunc(std::move(buffer))); + if (!buffer.empty() && chunk.GetReadCount() != 0 && !Failed) { + upsertCsvFunc(std::move(buffer)); + } + + // Wait for all jobs for current thread to finish + for (size_t i = 0; i < maxJobInflight; ++i) { + jobsInflight.acquire(); } TotalBytesRead += readBytes; - return WaitForQueue(0, inFlightRequests); + if (Settings.Verbose_) { + std::stringstream str; + double threadProcessingTimeSeconds = (TInstant::Now() - threadStartTime).SecondsFloat(); + str << std::endl << "File " << filePath << ", thread " << threadId << " processed " << PrettifyBytes(readBytes) << " and " + << (Failed ? "failed in " : "successfully finished in ") << std::setprecision(3) + << threadProcessingTimeSeconds << " sec"; + if (threadProcessingTimeSeconds > 0) { + str << ", " << PrettifyBytes((double)readBytes / threadProcessingTimeSeconds) << "/s" << std::endl; + } + std::cerr << str.str(); + } + promise.SetValue(); }; - threadResults[threadId] = NThreading::Async(loadCsv, *pool); + + if (!ProcessingPool->AddFunc(workerFunc)) { + return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func"); + } } NThreading::WaitAll(threadResults).Wait(); - for (size_t i = 0; i < splitter.GetSplitCount(); ++i) { - if (!threadResults[i].GetValueSync().IsSuccess()) { - return threadResults[i].GetValueSync(); - } + if (Failed) { + return *ErrorStatus; } return MakeStatus(); } -TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, - std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) { +TStatus TImportFileClient::TImpl::UpsertJson(IInputStream& input, const TString& dbPath, std::optional<ui64> inputSizeHint, + ProgressCallbackFunc & progressCallback) { const TType tableType = GetTableType(); ValidateTValueUpsertTable(); - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); - THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); + TMaxInflightGetter inFlightGetter(Settings.MaxInFlightRequests_, CurrentFileCount); ui64 readBytes = 0; ui64 batchBytes = 0; @@ -651,7 +876,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath batch.BeginList(); for (auto &line : batchLines) { - batch.AddListItem(JsonToYdbValue(line, tableType, settings.BinaryStringsEncoding_)); + batch.AddListItem(JsonToYdbValue(line, tableType, Settings.BinaryStringsEncoding_)); } batch.EndList(); @@ -669,7 +894,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath progressCallback(readBytes, *inputSizeHint); } - if (batchBytes < settings.BytesPerRequest_) { + if (batchBytes < Settings.BytesPerRequest_) { continue; } @@ -681,7 +906,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath batchLines.clear(); - inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *pool)); + inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *ProcessingPool)); auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests); if (!status.IsSuccess()) { @@ -696,9 +921,8 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath return WaitForQueue(0, inFlightRequests); } -TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename, +TStatus TImportFileClient::TImpl::UpsertParquet([[maybe_unused]] const TString& filename, [[maybe_unused]] const TString& dbPath, - [[maybe_unused]] const TImportFileSettings& settings, [[maybe_unused]] ProgressCallbackFunc & progressCallback) { #if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__) return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); @@ -734,8 +958,6 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString()); } - THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); - std::atomic<ui64> uploadedRows = 0; auto uploadedRowsCallback = [&](ui64 rows) { ui64 uploadedRowsValue = uploadedRows.fetch_add(rows); @@ -764,7 +986,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()); const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch); const size_t sliceCount = - (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0); + (totalSize / (size_t)Settings.BytesPerRequest_) + (totalSize % Settings.BytesPerRequest_ != 0 ? 1 : 0); const i64 rowsInSlice = batch->num_rows() / sliceCount; for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) { @@ -786,7 +1008,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam } // Logarithmic approach to find number of rows fit into the byte limit. - if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < settings.BytesPerRequest_) { + if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < Settings.BytesPerRequest_) { // Single row or fits into the byte limit. auto value = UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rowsBatch), strSchema); auto status = value.ExtractValueSync(); @@ -806,9 +1028,9 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam return MakeStatus(); }; - inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *pool)); + inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *ProcessingPool)); - auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests); + auto status = WaitForQueue(Settings.MaxInFlightRequests_, inFlightRequests); if (!status.IsSuccess()) { return status; } @@ -819,7 +1041,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam } inline -TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) { +TAsyncStatus TImportFileClient::TImpl::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) { auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings) .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { @@ -830,7 +1052,7 @@ TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const return TableClient->RetryOperation(upsert, RetrySettings); } -TType TImportFileClient::GetTableType() { +TType TImportFileClient::TImpl::GetTableType() { TTypeBuilder typeBuilder; typeBuilder.BeginStruct(); Y_ENSURE_BT(DbTableInfo); @@ -842,7 +1064,7 @@ TType TImportFileClient::GetTableType() { return typeBuilder.Build(); } -std::map<TString, TType> TImportFileClient::GetColumnTypes() { +std::map<TString, TType> TImportFileClient::TImpl::GetColumnTypes() { std::map<TString, TType> columnTypes; Y_ENSURE_BT(DbTableInfo); const auto& columns = DbTableInfo->GetTableColumns(); @@ -852,7 +1074,7 @@ std::map<TString, TType> TImportFileClient::GetColumnTypes() { return columnTypes; } -void TImportFileClient::ValidateTValueUpsertTable() { +void TImportFileClient::TImpl::ValidateTValueUpsertTable() { auto columnTypes = GetColumnTypes(); bool hasPgType = false; for (const auto& [_, type] : columnTypes) { diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 949e95ef38..d46780e39b 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -1,8 +1,10 @@ #pragma once +#include <semaphore> #include <thread> #include <functional> +#include <util/thread/pool.h> #include <ydb/public/lib/json_value/ydb_json_value.h> #include <ydb/public/lib/ydb_cli/common/command.h> #include <ydb/public/lib/ydb_cli/common/formats.h> @@ -46,7 +48,8 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB); FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB); FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100); - FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency()); + // Main thread that reads input file is CPU intensive so make room for it too + FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency() > 1 ? std::thread::hardware_concurrency() - 1 : 1); // Settings below are for CSV format only FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0); FLUENT_SETTING_DEFAULT(bool, Header, false); @@ -59,55 +62,19 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting class TImportFileClient { public: - explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig); + explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig, + const TImportFileSettings& settings = {}); TImportFileClient(const TImportFileClient&) = delete; // Ingest data from the input files to the database table. // fsPaths: vector of paths to input files // dbPath: full path to the database table, including the database path // settings: input data format and operational settings - TStatus Import(const TVector<TString>& fsPaths, const TString& dbPath, const TImportFileSettings& settings = {}); + TStatus Import(const TVector<TString>& filePaths, const TString& dbPath); private: - std::shared_ptr<NTable::TTableClient> TableClient; - std::shared_ptr<NScheme::TSchemeClient> SchemeClient; - - NTable::TBulkUpsertSettings UpsertSettings; - NTable::TRetryOperationSettings RetrySettings; - - std::unique_ptr<const NTable::TTableDescription> DbTableInfo; - - std::atomic<ui64> CurrentFileCount; - std::atomic<ui64> TotalBytesRead = 0; - - static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB - - using ProgressCallbackFunc = std::function<void (ui64, ui64)>; - - TStatus UpsertCsv(IInputStream& input, - const TString& dbPath, - const TImportFileSettings& settings, - const TString& filePath, - std::optional<ui64> inputSizeHint, - ProgressCallbackFunc & progressCallback); - - TStatus UpsertCsvByBlocks(const TString& filePath, - const TString& dbPath, - const TImportFileSettings& settings); - - TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder); - TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows); - - TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings, - std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback); - - TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings, - ProgressCallbackFunc & progressCallback); - TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema); - - TType GetTableType(); - std::map<TString, TType> GetColumnTypes(); - void ValidateTValueUpsertTable(); + class TImpl; + std::shared_ptr<TImpl> Impl_; }; } |