summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-02-06 17:30:22 +0300
committerivanmorozov <[email protected]>2023-02-06 17:30:22 +0300
commitaf5335d3dabf1bcf2f7a9abf3dfa5106c7b7fb80 (patch)
treed4c25eadb3cc2ffcc11a8fc3f157161f80824b03
parent67ee27d7887ae4f092513ca88030996298003d8d (diff)
speed up csv import
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp31
1 files changed, 24 insertions, 7 deletions
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 5ac623ca2ab..3591b099384 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -111,15 +111,32 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
namespace {
TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueueSize) {
- while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize) {
- auto status = inFlightRequests.front().ExtractValueSync();
- inFlightRequests.pop_front();
- if (!status.IsSuccess()) {
- return status;
+ std::vector<TStatus> problemResults;
+ while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize && problemResults.empty()) {
+ Y_UNUSED(NThreading::WaitAny(inFlightRequests));
+ ui32 delta = 0;
+ for (ui32 i = 0; i + delta < inFlightRequests.size();) {
+ inFlightRequests[i] = inFlightRequests[i + delta];
+ if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) {
+ auto status = inFlightRequests[i].ExtractValueSync();
+ if (!status.IsSuccess()) {
+ problemResults.emplace_back(status);
+ }
+ ++delta;
+ } else {
+ ++i;
+ }
+ }
+ while (delta) {
+ inFlightRequests.pop_back();
+ --delta;
}
}
-
- return MakeStatus();
+ if (problemResults.size()) {
+ return problemResults.front();
+ } else {
+ return MakeStatus();
+ }
}
}