aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Perfilov <pnv1@yandex-team.ru>2024-11-22 13:39:40 +0300
committerGitHub <noreply@github.com>2024-11-22 13:39:40 +0300
commitb13bd4af6109e62064aef918bbf593cc549cafa6 (patch)
treee7c6af44ca297c244b561f46f1cf7a8c19e47f56
parenta18f18d81996ca8e681bb6cabd441b52833d99bf (diff)
downloadydb-b13bd4af6109e62064aef918bbf593cc549cafa6.tar.gz
Improve csv import, step 3 (#11864)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp12
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser.cpp4
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp478
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h51
4 files changed, 367 insertions, 178 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 23ab8e32ea..8ed4aaa660 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -311,8 +311,8 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.Delimiter(Delimiter);
}
- TImportFileClient client(CreateDriver(config), config);
- ThrowOnError(client.Import(FilePaths, Path, settings));
+ TImportFileClient client(CreateDriver(config), config, settings);
+ ThrowOnError(client.Import(FilePaths, Path));
return EXIT_SUCCESS;
}
@@ -341,8 +341,8 @@ int TCommandImportFromJson::Run(TConfig& config) {
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
settings.Threads(Threads);
- TImportFileClient client(CreateDriver(config), config);
- ThrowOnError(client.Import(FilePaths, Path, settings));
+ TImportFileClient client(CreateDriver(config), config, settings);
+ ThrowOnError(client.Import(FilePaths, Path));
return EXIT_SUCCESS;
}
@@ -360,8 +360,8 @@ int TCommandImportFromParquet::Run(TConfig& config) {
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
settings.Threads(Threads);
- TImportFileClient client(CreateDriver(config), config);
- ThrowOnError(client.Import(FilePaths, Path, settings));
+ TImportFileClient client(CreateDriver(config), config, settings);
+ ThrowOnError(client.Import(FilePaths, Path));
return EXIT_SUCCESS;
}
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
index b3d50636b7..aff6db75f7 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp
+++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
@@ -483,8 +483,8 @@ void TCsvParser::BuildLineType() {
auto findIt = DestinationTypes->find(colName);
if (findIt != DestinationTypes->end()) {
builder.AddMember(colName, findIt->second);
- ResultLineTypesSorted.emplace_back(&findIt->second);
- ResultLineNamesSorted.emplace_back(&colName);
+ ResultLineTypesSorted.push_back(&findIt->second);
+ ResultLineNamesSorted.push_back(&colName);
SkipBitMap.push_back(false);
++ResultColumnCount;
} else {
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index ddfd3ccb61..0e417c3132 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -149,7 +149,7 @@ private:
};
class TCsvFileReader {
-private:
+public:
class TFileChunk {
public:
TFileChunk(TFile file, THolder<IInputStream>&& stream, ui64 size = std::numeric_limits<ui64>::max())
@@ -180,8 +180,7 @@ private:
};
public:
- TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow,
- TMaxInflightGetter& inFlightGetter) {
+ TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow, ui64 maxThreads) {
TFile file;
if (filePath) {
file = TFile(filePath, RdOnly);
@@ -199,7 +198,6 @@ public:
}
i64 skipSize = countInput.Counter();
- MaxInFlight = inFlightGetter.GetCurrentMaxInflight();
i64 fileSize = file.GetLength();
if (filePath.empty() || fileSize == -1) {
SplitCount = 1;
@@ -207,7 +205,7 @@ public:
return;
}
- SplitCount = Min(MaxInFlight, (fileSize - skipSize) / settings.BytesPerRequest_ + 1);
+ SplitCount = Min(maxThreads, (fileSize - skipSize) / settings.BytesPerRequest_ + 1);
i64 chunkSize = (fileSize - skipSize) / SplitCount;
if (chunkSize == 0) {
SplitCount = 1;
@@ -260,21 +258,148 @@ private:
size_t MaxInFlight;
};
+class TJobInFlightManager {
+public:
+ TJobInFlightManager(size_t orderNum, size_t fileCount, size_t maxJobInFlight)
+ : MaxJobInFlight(maxJobInFlight)
+ , CurrentFileCount(fileCount)
+ , CurrentSemaphoreMax(GetSemaphoreMaxValue(orderNum))
+ , JobsSemaphore(GetSemaphoreMaxValue(orderNum)) {
+ }
+
+ // Return value: true if this call was useful (Inflight manager is still working)
+ // Parameter informedSoFar: number of other Inflight managers already informed. Needed to calculate newSemaphoreMax
+ bool OnAnotherFileFinished(size_t informedSoFar) {
+ std::lock_guard<std::mutex> lock(Mutex);
+ if (Finished || CurrentFileCount <= 1) {
+ return false;
+ }
+ --CurrentFileCount;
+ size_t newSemaphoreMax = GetSemaphoreMaxValue(informedSoFar);
+ size_t semaphoreSizeDiff = newSemaphoreMax - CurrentSemaphoreMax;
+ CurrentSemaphoreMax = newSemaphoreMax;
+ if (semaphoreSizeDiff > 0) {
+ JobsSemaphore.release(semaphoreSizeDiff);
+ }
+ return true;
+ }
+
+ void AcquireJob() {
+ JobsSemaphore.acquire();
+ }
+
+ void ReleaseJob() {
+ JobsSemaphore.release();
+ }
+
+ void Finish() {
+ std::lock_guard<std::mutex> lock(Mutex);
+ Finished = true;
+ }
+
+ void WaitForAllJobs() {
+ std::lock_guard<std::mutex> lock(Mutex);
+ for (size_t i = 0; i < CurrentSemaphoreMax; ++i) {
+ JobsSemaphore.acquire();
+ }
+ }
+
+private:
+ size_t GetSemaphoreMaxValue(size_t orderNum) const {
+ return Max(1ul, MaxJobInFlight / CurrentFileCount
+ // One more thread for the first <MaxJobInFlight % CurrentFileCount> managers to utilize max jobs inflight
+ + (orderNum < MaxJobInFlight % CurrentFileCount ? 1 : 0));
+ }
+ size_t MaxJobInFlight;
+ size_t CurrentFileCount;
+ size_t CurrentSemaphoreMax;
+ bool Finished = false;
+ std::mutex Mutex;
+ // Jobs starts on launching a worker in a separate thread that builds TValue and sends request
+ // Job ends on receiving final request (after all retries)
+ std::counting_semaphore<> JobsSemaphore;
+}; // TJobInflightManager
+
} // namespace
-TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
+class TImportFileClient::TImpl {
+public:
+ explicit TImpl(const TDriver& driver, const TClientCommand::TConfig& rootConfig,
+ const TImportFileSettings& settings);
+ TStatus Import(const TVector<TString>& filePaths, const TString& dbPath);
+
+private:
+ using ProgressCallbackFunc = std::function<void (ui64, ui64)>;
+
+ TStatus UpsertCsv(IInputStream& input,
+ const TString& dbPath,
+ const TString& filePath,
+ std::optional<ui64> inputSizeHint,
+ ProgressCallbackFunc & progressCallback,
+ std::shared_ptr<TJobInFlightManager> jobInflightManager);
+
+ TStatus UpsertCsvByBlocks(const TString& filePath,
+ const TString& dbPath);
+ TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder);
+ TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows);
+ TStatus UpsertJson(IInputStream &input, const TString &dbPath, std::optional<ui64> inputSizeHint,
+ ProgressCallbackFunc & progressCallback);
+ TStatus UpsertParquet(const TString& filename, const TString& dbPath, ProgressCallbackFunc & progressCallback);
+ TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);
+ TType GetTableType();
+ std::map<TString, TType> GetColumnTypes();
+ void ValidateTValueUpsertTable();
+
+ std::shared_ptr<NTable::TTableClient> TableClient;
+ std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
+ const TImportFileSettings& Settings;
+ NTable::TBulkUpsertSettings UpsertSettings;
+ NTable::TRetryOperationSettings RetrySettings;
+ std::unique_ptr<const NTable::TTableDescription> DbTableInfo;
+ std::atomic<ui64> CurrentFileCount;
+ std::atomic<ui64> TotalBytesRead = 0;
+ // RequestInflight increases on sending a single request to server
+ // Decreases on receiving any response for its request
+ std::unique_ptr<std::counting_semaphore<>> RequestsInflight;
+ // Common pool between all files for building TValues
+ THolder<IThreadPool> ProcessingPool;
+ std::atomic<bool> Failed = false;
+ std::atomic<bool> InformedAboutLimit = false;
+ THolder<TStatus> ErrorStatus;
+
+ static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB
+}; // TImpl
+
+TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig,
+ const TImportFileSettings& settings)
+ : Impl_(new TImpl(driver, rootConfig, settings)) {
+}
+
+TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath) {
+ return Impl_->Import(filePaths, dbPath);
+}
+
+TImportFileClient::TImpl::TImpl(const TDriver& driver, const TClientCommand::TConfig& rootConfig,
+ const TImportFileSettings& settings)
: TableClient(std::make_shared<NTable::TTableClient>(driver))
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
+ , Settings(settings)
{
RetrySettings
.MaxRetries(TImportFileSettings::MaxRetries)
.Idempotent(true)
.Verbose(rootConfig.IsVerbose());
+ IThreadPool::TParams poolParams;
+ if (!Settings.NewlineDelimited_) {
+ poolParams.SetBlocking(true);
+ }
+ ProcessingPool = CreateThreadPool(Settings.Threads_, 0, poolParams);
+ RequestsInflight = std::make_unique<std::counting_semaphore<>>(Settings.MaxInFlightRequests_);
}
-TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) {
+TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, const TString& dbPath) {
CurrentFileCount = filePaths.size();
- if (settings.Format_ == EDataFormat::Tsv && settings.Delimiter_ != "\t") {
+ if (Settings.Format_ == EDataFormat::Tsv && Settings.Delimiter_ != "\t") {
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
}
@@ -301,8 +426,8 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
}
UpsertSettings
- .OperationTimeout(settings.OperationTimeout_)
- .ClientTimeout(settings.ClientTimeout_);
+ .OperationTimeout(Settings.OperationTimeout_)
+ .ClientTimeout(Settings.ClientTimeout_);
bool isStdoutInteractive = IsStdoutInteractive();
size_t filePathsSize = filePaths.size();
@@ -324,8 +449,20 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
// If the single empty filename passed, read from stdin, else from the file
- for (const auto& filePath : filePaths) {
- auto func = [&, this] {
+ std::vector<std::shared_ptr<TJobInFlightManager>> inflightManagers;
+ std::mutex inflightManagersLock;
+ if ((Settings.Format_ == EDataFormat::Tsv || Settings.Format_ == EDataFormat::Csv) && !Settings.NewlineDelimited_) {
+ inflightManagers.reserve(filePathsSize);
+ // <maxInFlight> requests in flight on server and <maxThreads> threads building TValue
+ size_t maxJobInflight = Settings.Threads_ + Settings.MaxInFlightRequests_;
+ for (size_t i = 0; i < filePathsSize; ++i) {
+ inflightManagers.push_back(std::make_shared<TJobInFlightManager>(i, filePathsSize, maxJobInflight));
+ }
+ }
+
+ for (size_t fileOrderNumber = 0; fileOrderNumber < filePathsSize; ++fileOrderNumber) {
+ const auto& filePath = filePaths[fileOrderNumber];
+ auto func = [&, fileOrderNumber, this] {
std::unique_ptr<TFileInput> fileInput;
std::optional<ui64> fileSizeHint;
@@ -348,13 +485,12 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
fileSizeHint = fileLength;
}
- fileInput = std::make_unique<TFileInput>(file, settings.FileBufferSize_);
+ fileInput = std::make_unique<TFileInput>(file, Settings.FileBufferSize_);
}
ProgressCallbackFunc progressCallback;
- if (isStdoutInteractive)
- {
+ if (isStdoutInteractive) {
ui64 oldProgress = 0;
progressCallback = [&, oldProgress](ui64 current, ui64 total) mutable {
ui64 progress = static_cast<ui64>((static_cast<double>(current) / total) * 100.0);
@@ -367,26 +503,41 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
IInputStream& input = fileInput ? *fileInput : Cin;
- switch (settings.Format_) {
- case EDataFormat::Default:
- case EDataFormat::Csv:
- case EDataFormat::Tsv:
- if (settings.NewlineDelimited_) {
- return UpsertCsvByBlocks(filePath, dbPath, settings);
- } else {
- return UpsertCsv(input, dbPath, settings, filePath, fileSizeHint, progressCallback);
- }
- case EDataFormat::Json:
- case EDataFormat::JsonUnicode:
- case EDataFormat::JsonBase64:
- return UpsertJson(input, dbPath, settings, fileSizeHint, progressCallback);
- case EDataFormat::Parquet:
- return UpsertParquet(filePath, dbPath, settings, progressCallback);
- default: ;
- }
+ try {
+ switch (Settings.Format_) {
+ case EDataFormat::Default:
+ case EDataFormat::Csv:
+ case EDataFormat::Tsv:
+ if (Settings.NewlineDelimited_) {
+ return UpsertCsvByBlocks(filePath, dbPath);
+ } else {
+ auto status = UpsertCsv(input, dbPath, filePath, fileSizeHint, progressCallback,
+ inflightManagers.at(fileOrderNumber));
+ std::lock_guard<std::mutex> lock(inflightManagersLock);
+ inflightManagers[fileOrderNumber]->Finish();
+ size_t informedManagers = 0;
+ for (auto& inflightManager : inflightManagers) {
+ if (inflightManager->OnAnotherFileFinished(informedManagers)) {
+ ++informedManagers;
+ }
+ }
+ return status;
+ }
+ case EDataFormat::Json:
+ case EDataFormat::JsonUnicode:
+ case EDataFormat::JsonBase64:
+ return UpsertJson(input, dbPath, fileSizeHint, progressCallback);
+ case EDataFormat::Parquet:
+ return UpsertParquet(filePath, dbPath, progressCallback);
+ default: ;
+ }
- return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Unsupported format #" << (int) Settings.Format_);
+ } catch (const std::exception& e) {
+ return MakeStatus(EStatus::INTERNAL_ERROR,
+ TStringBuilder() << "Error: " << e.what());
+ }
};
asyncResults.push_back(NThreading::Async(std::move(func), *pool));
@@ -413,8 +564,8 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
}
inline
-TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) {
- auto upsert = [this, dbPath, rows = builder.Build()]
+TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) {
+ auto retryFunc = [this, dbPath, rows = builder.Build()]
(NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto());
return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings)
@@ -423,12 +574,12 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue
return NThreading::MakeFuture(status);
});
};
- return TableClient->RetryOperation(upsert, RetrySettings);
+ return TableClient->RetryOperation(retryFunc, RetrySettings);
}
inline
-TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) {
- auto upsert = [this, dbPath, rows = std::move(rows)]
+TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) {
+ auto retryFunc = [this, dbPath, rows = std::move(rows)]
(NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto());
return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings)
@@ -437,18 +588,36 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue
return NThreading::MakeFuture(status);
});
};
- return TableClient->RetryOperation(upsert, RetrySettings);
+ if (!RequestsInflight->try_acquire()) {
+ if (Settings.Verbose_ && Settings.NewlineDelimited_) {
+ if (!InformedAboutLimit.exchange(true)) {
+ Cerr << (TStringBuilder() << "@ (each '@' means max request inflight is reached and a worker thread is waiting for "
+ "any response from database)" << Endl);
+ } else {
+ Cerr << '@';
+ }
+ }
+ RequestsInflight->acquire();
+ }
+ return TableClient->RetryOperation(retryFunc, RetrySettings)
+ .Apply([this](const TAsyncStatus& asyncStatus) {
+ NYdb::TStatus status = asyncStatus.GetValueSync();
+ if (!status.IsSuccess()) {
+ if (!Failed.exchange(true)) {
+ ErrorStatus = MakeHolder<TStatus>(status);
+ }
+ }
+ RequestsInflight->release();
+ return asyncStatus;
+ });
}
-TStatus TImportFileClient::UpsertCsv(IInputStream& input,
+TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input,
const TString& dbPath,
- const TImportFileSettings& settings,
const TString& filePath,
std::optional<ui64> inputSizeHint,
- ProgressCallbackFunc & progressCallback) {
-
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
-
+ ProgressCallbackFunc & progressCallback,
+ std::shared_ptr<TJobInFlightManager> jobInflightManager) {
TCountingInput countInput(&input);
NCsvFormat::TLinesSplitter splitter(countInput);
@@ -458,15 +627,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
TCsvParser parser;
bool removeLastDelimiter = false;
- InitCsvParser(parser, removeLastDelimiter, splitter, settings, &columnTypes, DbTableInfo.get());
+ InitCsvParser(parser, removeLastDelimiter, splitter, Settings, &columnTypes, DbTableInfo.get());
- for (ui32 i = 0; i < settings.SkipRows_; ++i) {
+ for (ui32 i = 0; i < Settings.SkipRows_; ++i) {
splitter.ConsumeLine();
}
- THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
-
- ui64 row = settings.SkipRows_ + settings.Header_ + 1;
+ ui64 row = Settings.SkipRows_ + Settings.Header_ + 1;
ui64 batchRows = 0;
ui64 nextBorder = VerboseModeStepSize;
ui64 batchBytes = 0;
@@ -476,8 +643,19 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
std::vector<TAsyncStatus> inFlightRequests;
std::vector<TString> buffer;
- auto upsertCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) {
- return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row)).ExtractValueSync();
+ auto upsertCsvFunc = [&, jobInflightManager](std::vector<TString>&& buffer, ui64 row) {
+ try {
+ UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row))
+ .Apply([jobInflightManager](const TAsyncStatus& asyncStatus) {
+ jobInflightManager->ReleaseJob();
+ return asyncStatus;
+ });
+ } catch (const std::exception& e) {
+ if (!Failed.exchange(true)) {
+ ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
+ }
+ jobInflightManager->ReleaseJob();
+ }
};
while (TString line = splitter.ConsumeLine()) {
@@ -489,21 +667,21 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
batchBytes += line.size();
if (removeLastDelimiter) {
- if (!line.EndsWith(settings.Delimiter_)) {
+ if (!line.EndsWith(Settings.Delimiter_)) {
return MakeStatus(EStatus::BAD_REQUEST,
"According to the header, lines should end with a delimiter");
}
- line.erase(line.size() - settings.Delimiter_.size());
+ line.erase(line.size() - Settings.Delimiter_.size());
}
buffer.push_back(line);
- if (readBytes >= nextBorder && settings.Verbose_) {
+ if (readBytes >= nextBorder && Settings.Verbose_) {
nextBorder += VerboseModeStepSize;
- Cerr << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl;
+ Cerr << (TStringBuilder() << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl);
}
- if (batchBytes < settings.BytesPerRequest_) {
+ if (batchBytes < Settings.BytesPerRequest_) {
continue;
}
@@ -511,49 +689,60 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
progressCallback(readBytes, *inputSizeHint);
}
- auto asyncUpsertCSV = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable {
- return upsertCsvFunc(std::move(buffer), row);
+ auto workerFunc = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable {
+ upsertCsvFunc(std::move(buffer), row);
};
row += batchRows;
batchRows = 0;
batchBytes = 0;
buffer.clear();
- inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool));
+ jobInflightManager->AcquireJob();
- auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
- if (!status.IsSuccess()) {
- return status;
+ if (!ProcessingPool->AddFunc(workerFunc)) {
+ return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func");
+ }
+
+ if (Failed) {
+ break;
}
}
- if (!buffer.empty() && countInput.Counter() > 0) {
+ // Send the rest if buffer is not empty
+ if (!buffer.empty() && countInput.Counter() > 0 && !Failed) {
+ jobInflightManager->AcquireJob();
upsertCsvFunc(std::move(buffer), row);
}
+ jobInflightManager->WaitForAllJobs();
+
TotalBytesRead += readBytes;
- auto waitResult = WaitForQueue(0, inFlightRequests);
- if (settings.Verbose_) {
+ if (Settings.Verbose_) {
std::stringstream str;
double fileProcessingTimeSeconds = (TInstant::Now() - fileStartTime).SecondsFloat();
str << std::endl << "File " << filePath << " of " << PrettifyBytes(readBytes)
- << " processed in " << std::setprecision(3) << fileProcessingTimeSeconds << " sec";
+ << (Failed ? " failed in " : " processed in ") << std::setprecision(3) << fileProcessingTimeSeconds << " sec";
if (fileProcessingTimeSeconds > 0) {
str << ", " << PrettifyBytes((double)readBytes / fileProcessingTimeSeconds) << "/s" << std::endl;
}
std::cerr << str.str();
}
- return waitResult;
+ if (Failed) {
+ return *ErrorStatus;
+ } else {
+ return MakeStatus();
+ }
}
-TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
- const TString& dbPath,
- const TImportFileSettings& settings) {
-
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
+TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
+ const TString& dbPath) {
TString headerRow;
- TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter);
+ ui64 maxThreads = Max(1ul, Settings.Threads_ / CurrentFileCount);
+ TCsvFileReader splitter(filePath, Settings, headerRow, maxThreads);
+ ui64 threadCount = splitter.GetSplitCount();
+ // MaxInFlightRequests_ requests in flight on server and threadCount threads building TValue
+ size_t maxJobInflightTotal = threadCount + Settings.MaxInFlightRequests_;
auto columnTypes = GetColumnTypes();
ValidateTValueUpsertTable();
@@ -561,83 +750,119 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
TCsvParser parser;
bool removeLastDelimiter = false;
TStringInput headerInput(headerRow);
- NCsvFormat::TLinesSplitter headerSplitter(headerInput, settings.Delimiter_[0]);
- InitCsvParser(parser, removeLastDelimiter, headerSplitter, settings, &columnTypes, DbTableInfo.get());
-
- TVector<TAsyncStatus> threadResults(splitter.GetSplitCount());
- THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount());
- for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) {
- auto loadCsv = [&, threadId] () {
+ NCsvFormat::TLinesSplitter headerSplitter(headerInput, Settings.Delimiter_[0]);
+ InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, &columnTypes, DbTableInfo.get());
+
+ TVector<NThreading::TFuture<void>> threadResults;
+ threadResults.reserve(threadCount);
+ for (size_t threadId = 0; threadId < threadCount; ++threadId) {
+ auto promise = NThreading::NewPromise<void>();
+ threadResults.push_back(promise.GetFuture());
+ auto workerFunc = [&, threadId, promise] () mutable {
+ TInstant threadStartTime = TInstant::Now();
+ size_t maxJobInflight = maxJobInflightTotal / threadCount + (threadId < maxJobInflightTotal % threadCount ? 1 : 0);
+ // Jobs starts on starting building TValue and sends request
+ // Job ends on receiving final request (after all retries)
+ std::counting_semaphore<> jobsInflight(maxJobInflight);
auto upsertCsvFunc = [&](std::vector<TString>&& buffer) {
- return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath));
+ jobsInflight.acquire();
+ try {
+ UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath))
+ .Apply([&jobsInflight](const TAsyncStatus& asyncStatus) {
+ jobsInflight.release();
+ return asyncStatus;
+ });
+ } catch (const std::exception& e) {
+ if (!Failed.exchange(true)) {
+ ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
+ }
+ jobsInflight.release();
+ }
};
std::vector<TAsyncStatus> inFlightRequests;
std::vector<TString> buffer;
- ui32 idx = settings.SkipRows_;
+ ui32 idx = Settings.SkipRows_;
ui64 readBytes = 0;
ui64 batchBytes = 0;
ui64 nextBorder = VerboseModeStepSize;
- TAsyncStatus status;
TString line;
- while (splitter.GetChunk(threadId).ConsumeLine(line)) {
+ TCsvFileReader::TFileChunk& chunk = splitter.GetChunk(threadId);
+ while (chunk.ConsumeLine(line)) {
if (line.empty()) {
continue;
}
readBytes += line.size();
batchBytes += line.size();
if (removeLastDelimiter) {
- if (!line.EndsWith(settings.Delimiter_)) {
- return MakeStatus(EStatus::BAD_REQUEST,
- "According to the header, lines should end with a delimiter");
+ if (!line.EndsWith(Settings.Delimiter_)) {
+ if (!Failed.exchange(true)) {
+ ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::BAD_REQUEST,
+ "According to the header, lines should end with a delimiter"));
+ }
+ break;
}
- line.erase(line.size() - settings.Delimiter_.size());
+ line.erase(line.size() - Settings.Delimiter_.size());
}
buffer.push_back(line);
++idx;
- if (readBytes >= nextBorder && settings.Verbose_) {
+ if (readBytes >= nextBorder && Settings.Verbose_) {
nextBorder += VerboseModeStepSize;
- TStringBuilder builder;
- builder << "Processed " << PrettifyBytes(readBytes) << " and " << idx << " records" << Endl;
- Cerr << builder;
+ Cerr << (TStringBuilder() << "Processed " << PrettifyBytes(readBytes) << " and "
+ << idx << " records" << Endl);
}
- if (batchBytes >= settings.BytesPerRequest_) {
+ if (batchBytes >= Settings.BytesPerRequest_) {
batchBytes = 0;
- auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests);
- if (!status.IsSuccess()) {
- return status;
- }
-
- inFlightRequests.push_back(upsertCsvFunc(std::move(buffer)));
+ upsertCsvFunc(std::move(buffer));
buffer.clear();
+
+ if (Failed) {
+ break;
+ }
}
}
- if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) {
- inFlightRequests.push_back(upsertCsvFunc(std::move(buffer)));
+ if (!buffer.empty() && chunk.GetReadCount() != 0 && !Failed) {
+ upsertCsvFunc(std::move(buffer));
+ }
+
+ // Wait for all jobs for current thread to finish
+ for (size_t i = 0; i < maxJobInflight; ++i) {
+ jobsInflight.acquire();
}
TotalBytesRead += readBytes;
- return WaitForQueue(0, inFlightRequests);
+ if (Settings.Verbose_) {
+ std::stringstream str;
+ double threadProcessingTimeSeconds = (TInstant::Now() - threadStartTime).SecondsFloat();
+ str << std::endl << "File " << filePath << ", thread " << threadId << " processed " << PrettifyBytes(readBytes) << " and "
+ << (Failed ? "failed in " : "successfully finished in ") << std::setprecision(3)
+ << threadProcessingTimeSeconds << " sec";
+ if (threadProcessingTimeSeconds > 0) {
+ str << ", " << PrettifyBytes((double)readBytes / threadProcessingTimeSeconds) << "/s" << std::endl;
+ }
+ std::cerr << str.str();
+ }
+ promise.SetValue();
};
- threadResults[threadId] = NThreading::Async(loadCsv, *pool);
+
+ if (!ProcessingPool->AddFunc(workerFunc)) {
+ return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func");
+ }
}
NThreading::WaitAll(threadResults).Wait();
- for (size_t i = 0; i < splitter.GetSplitCount(); ++i) {
- if (!threadResults[i].GetValueSync().IsSuccess()) {
- return threadResults[i].GetValueSync();
- }
+ if (Failed) {
+ return *ErrorStatus;
}
return MakeStatus();
}
-TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
- std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
+TStatus TImportFileClient::TImpl::UpsertJson(IInputStream& input, const TString& dbPath, std::optional<ui64> inputSizeHint,
+ ProgressCallbackFunc & progressCallback) {
const TType tableType = GetTableType();
ValidateTValueUpsertTable();
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
- THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
+ TMaxInflightGetter inFlightGetter(Settings.MaxInFlightRequests_, CurrentFileCount);
ui64 readBytes = 0;
ui64 batchBytes = 0;
@@ -651,7 +876,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
batch.BeginList();
for (auto &line : batchLines) {
- batch.AddListItem(JsonToYdbValue(line, tableType, settings.BinaryStringsEncoding_));
+ batch.AddListItem(JsonToYdbValue(line, tableType, Settings.BinaryStringsEncoding_));
}
batch.EndList();
@@ -669,7 +894,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
progressCallback(readBytes, *inputSizeHint);
}
- if (batchBytes < settings.BytesPerRequest_) {
+ if (batchBytes < Settings.BytesPerRequest_) {
continue;
}
@@ -681,7 +906,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
batchLines.clear();
- inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *pool));
+ inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *ProcessingPool));
auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
if (!status.IsSuccess()) {
@@ -696,9 +921,8 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
return WaitForQueue(0, inFlightRequests);
}
-TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename,
+TStatus TImportFileClient::TImpl::UpsertParquet([[maybe_unused]] const TString& filename,
[[maybe_unused]] const TString& dbPath,
- [[maybe_unused]] const TImportFileSettings& settings,
[[maybe_unused]] ProgressCallbackFunc & progressCallback) {
#if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__)
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
@@ -734,8 +958,6 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString());
}
- THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
-
std::atomic<ui64> uploadedRows = 0;
auto uploadedRowsCallback = [&](ui64 rows) {
ui64 uploadedRowsValue = uploadedRows.fetch_add(rows);
@@ -764,7 +986,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema());
const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch);
const size_t sliceCount =
- (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0);
+ (totalSize / (size_t)Settings.BytesPerRequest_) + (totalSize % Settings.BytesPerRequest_ != 0 ? 1 : 0);
const i64 rowsInSlice = batch->num_rows() / sliceCount;
for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) {
@@ -786,7 +1008,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
}
// Logarithmic approach to find number of rows fit into the byte limit.
- if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < settings.BytesPerRequest_) {
+ if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < Settings.BytesPerRequest_) {
// Single row or fits into the byte limit.
auto value = UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rowsBatch), strSchema);
auto status = value.ExtractValueSync();
@@ -806,9 +1028,9 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
return MakeStatus();
};
- inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *pool));
+ inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *ProcessingPool));
- auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
+ auto status = WaitForQueue(Settings.MaxInFlightRequests_, inFlightRequests);
if (!status.IsSuccess()) {
return status;
}
@@ -819,7 +1041,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filenam
}
inline
-TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) {
+TAsyncStatus TImportFileClient::TImpl::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) {
auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings)
.Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
@@ -830,7 +1052,7 @@ TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const
return TableClient->RetryOperation(upsert, RetrySettings);
}
-TType TImportFileClient::GetTableType() {
+TType TImportFileClient::TImpl::GetTableType() {
TTypeBuilder typeBuilder;
typeBuilder.BeginStruct();
Y_ENSURE_BT(DbTableInfo);
@@ -842,7 +1064,7 @@ TType TImportFileClient::GetTableType() {
return typeBuilder.Build();
}
-std::map<TString, TType> TImportFileClient::GetColumnTypes() {
+std::map<TString, TType> TImportFileClient::TImpl::GetColumnTypes() {
std::map<TString, TType> columnTypes;
Y_ENSURE_BT(DbTableInfo);
const auto& columns = DbTableInfo->GetTableColumns();
@@ -852,7 +1074,7 @@ std::map<TString, TType> TImportFileClient::GetColumnTypes() {
return columnTypes;
}
-void TImportFileClient::ValidateTValueUpsertTable() {
+void TImportFileClient::TImpl::ValidateTValueUpsertTable() {
auto columnTypes = GetColumnTypes();
bool hasPgType = false;
for (const auto& [_, type] : columnTypes) {
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index 949e95ef38..d46780e39b 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -1,8 +1,10 @@
#pragma once
+#include <semaphore>
#include <thread>
#include <functional>
+#include <util/thread/pool.h>
#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/command.h>
#include <ydb/public/lib/ydb_cli/common/formats.h>
@@ -46,7 +48,8 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB);
FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB);
FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100);
- FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency());
+ // Main thread that reads input file is CPU intensive so make room for it too
+ FLUENT_SETTING_DEFAULT(ui64, Threads, std::thread::hardware_concurrency() > 1 ? std::thread::hardware_concurrency() - 1 : 1);
// Settings below are for CSV format only
FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0);
FLUENT_SETTING_DEFAULT(bool, Header, false);
@@ -59,55 +62,19 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
class TImportFileClient {
public:
- explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig);
+ explicit TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig,
+ const TImportFileSettings& settings = {});
TImportFileClient(const TImportFileClient&) = delete;
// Ingest data from the input files to the database table.
// fsPaths: vector of paths to input files
// dbPath: full path to the database table, including the database path
// settings: input data format and operational settings
- TStatus Import(const TVector<TString>& fsPaths, const TString& dbPath, const TImportFileSettings& settings = {});
+ TStatus Import(const TVector<TString>& filePaths, const TString& dbPath);
private:
- std::shared_ptr<NTable::TTableClient> TableClient;
- std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
-
- NTable::TBulkUpsertSettings UpsertSettings;
- NTable::TRetryOperationSettings RetrySettings;
-
- std::unique_ptr<const NTable::TTableDescription> DbTableInfo;
-
- std::atomic<ui64> CurrentFileCount;
- std::atomic<ui64> TotalBytesRead = 0;
-
- static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB
-
- using ProgressCallbackFunc = std::function<void (ui64, ui64)>;
-
- TStatus UpsertCsv(IInputStream& input,
- const TString& dbPath,
- const TImportFileSettings& settings,
- const TString& filePath,
- std::optional<ui64> inputSizeHint,
- ProgressCallbackFunc & progressCallback);
-
- TStatus UpsertCsvByBlocks(const TString& filePath,
- const TString& dbPath,
- const TImportFileSettings& settings);
-
- TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder);
- TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows);
-
- TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings,
- std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);
-
- TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings,
- ProgressCallbackFunc & progressCallback);
- TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);
-
- TType GetTableType();
- std::map<TString, TType> GetColumnTypes();
- void ValidateTValueUpsertTable();
+ class TImpl;
+ std::shared_ptr<TImpl> Impl_;
};
}