diff options
| author | ivanmorozov <[email protected]> | 2023-02-06 17:30:22 +0300 |
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2023-02-06 17:30:22 +0300 |
| commit | af5335d3dabf1bcf2f7a9abf3dfa5106c7b7fb80 (patch) | |
| tree | d4c25eadb3cc2ffcc11a8fc3f157161f80824b03 | |
| parent | 67ee27d7887ae4f092513ca88030996298003d8d (diff) | |
speed up csv import
| -rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 5ac623ca2ab..3591b099384 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -111,15 +111,32 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath namespace { TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueueSize) { - while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize) { - auto status = inFlightRequests.front().ExtractValueSync(); - inFlightRequests.pop_front(); - if (!status.IsSuccess()) { - return status; + std::vector<TStatus> problemResults; + while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize && problemResults.empty()) { + Y_UNUSED(NThreading::WaitAny(inFlightRequests)); + ui32 delta = 0; + for (ui32 i = 0; i + delta < inFlightRequests.size();) { + inFlightRequests[i] = inFlightRequests[i + delta]; + if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) { + auto status = inFlightRequests[i].ExtractValueSync(); + if (!status.IsSuccess()) { + problemResults.emplace_back(status); + } + ++delta; + } else { + ++i; + } + } + while (delta) { + inFlightRequests.pop_back(); + --delta; } } - - return MakeStatus(); + if (problemResults.size()) { + return problemResults.front(); + } else { + return MakeStatus(); + } } } |
