aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-03-27 17:06:50 +0300
committerstanly <stanly@yandex-team.com>2023-03-27 17:06:50 +0300
commit79588836df9759f558cf12946c9907adc092ff24 (patch)
tree85468f553a2f46132ae8c5f3dc02a094ace5f9b8
parent773aa8b4c48879485bc7cbaa6995d68a574530a1 (diff)
downloadydb-79588836df9759f558cf12946c9907adc092ff24.tar.gz
optimize slicing on send of parquet data
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp203
1 files changed, 93 insertions, 110 deletions
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 69f85c9407a..c9d75669fee 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -2,39 +2,39 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
-#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_formats.pb.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
+#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
+#include <library/cpp/string_utils/csv/csv.h>
+
+#include <util/folder/path.h>
#include <util/generic/vector.h>
#include <util/stream/file.h>
#include <util/string/builder.h>
-#include <util/folder/path.h>
-
-#include <deque>
-#include <library/cpp/string_utils/csv/csv.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/api.h>
-#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
-#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
-#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
+#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h>
+
+#include <stack>
namespace NYdb {
namespace NConsoleClient {
-
namespace {
-static inline
+inline
TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
NYql::TIssues issues;
if (error) {
@@ -43,8 +43,30 @@ TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
return TStatus(code, std::move(issues));
}
+TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFlightRequests) {
+ while (!inFlightRequests.empty() && inFlightRequests.size() >= maxQueueSize) {
+ NThreading::WaitAny(inFlightRequests).Wait();
+ ui32 delta = 0;
+ for (ui32 i = 0; i + delta < inFlightRequests.size();) {
+ if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) {
+ auto status = inFlightRequests[i].ExtractValueSync();
+ if (!status.IsSuccess()) {
+ return status;
+ }
+ ++delta;
+ inFlightRequests[i] = inFlightRequests[inFlightRequests.size() - delta];
+ } else {
+ ++i;
+ }
+ }
+ inFlightRequests.resize(inFlightRequests.size() - delta);
+ }
+
+ return MakeStatus();
}
+} // namespace
+
TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
: OperationClient(std::make_shared<NOperation::TOperationClient>(driver))
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
@@ -109,36 +131,6 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
}
-namespace {
-
-TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueueSize) {
- std::vector<TStatus> problemResults;
- while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize && problemResults.empty()) {
- NThreading::WaitAny(inFlightRequests).Wait();
- ui32 delta = 0;
- for (ui32 i = 0; i + delta < inFlightRequests.size();) {
- if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) {
- auto status = inFlightRequests[i].ExtractValueSync();
- if (!status.IsSuccess()) {
- problemResults.emplace_back(status);
- }
- ++delta;
- inFlightRequests[i] = inFlightRequests[inFlightRequests.size() - delta];
- } else {
- ++i;
- }
- }
- inFlightRequests.resize(inFlightRequests.size() - delta);
- }
- if (problemResults.size()) {
- return problemResults.front();
- } else {
- return MakeStatus();
- }
-}
-
-}
-
inline
TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& buffer) {
auto upsert = [this, dbPath, buffer](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
@@ -189,7 +181,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
UpsertSettings.FormatSettings(formatSettings);
}
- std::deque<TAsyncStatus> inFlightRequests;
+ std::vector<TAsyncStatus> inFlightRequests;
ui32 idx = settings.SkipRows_;
ui64 readSize = 0;
@@ -205,7 +197,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl;
}
if (buffer.Size() >= settings.BytesPerRequest_) {
- auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
+ auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
if (!status.IsSuccess()) {
return status;
}
@@ -219,7 +211,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
}
- return WaitForQueue(inFlightRequests, 0);
+ return WaitForQueue(0, inFlightRequests);
}
inline
@@ -250,7 +242,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
(settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
NYdb::EBinaryStringEncoding::Unicode;
- std::deque<TAsyncStatus> inFlightRequests;
+ std::vector<TAsyncStatus> inFlightRequests;
size_t currentSize = 0;
size_t currentRows = 0;
@@ -266,7 +258,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
if (currentSize >= settings.BytesPerRequest_) {
currentBatch->EndList();
- auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
+ auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
if (!status.IsSuccess()) {
return status;
}
@@ -285,13 +277,15 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch));
}
- return WaitForQueue(inFlightRequests, 0);
+ return WaitForQueue(0, inFlightRequests);
}
-TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) {
- #if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__)
- return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
- #else
+TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename,
+ [[maybe_unused]] const TString& dbPath,
+ [[maybe_unused]] const TImportFileSettings& settings) {
+#if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__)
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
+#else
std::shared_ptr<arrow::io::ReadableFile> infile;
arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename);
if (!fileResult.ok()) {
@@ -300,95 +294,84 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename
std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie();
std::unique_ptr<parquet::arrow::FileReader> fileReader;
- arrow::MemoryPool *pool = arrow::default_memory_pool();
arrow::Status st;
- st = parquet::arrow::OpenFile(readableFile, pool, &fileReader);
+ st = parquet::arrow::OpenFile(readableFile, arrow::default_memory_pool(), &fileReader);
if (!st.ok()) {
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString());
}
- std::shared_ptr<parquet::FileMetaData> metaData = parquet::ReadMetaData(readableFile);
-
- i64 numRowGroups = metaData->num_row_groups();
+ const i64 numRowGroups = parquet::ReadMetaData(readableFile)->num_row_groups();
std::vector<int> row_group_indices(numRowGroups);
for (i64 i = 0; i < numRowGroups; i++) {
row_group_indices[i] = i;
}
- std::shared_ptr<arrow::RecordBatchReader> reader;
+ std::unique_ptr<arrow::RecordBatchReader> reader;
st = fileReader->GetRecordBatchReader(row_group_indices, &reader);
if (!st.ok()) {
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString());
}
- std::deque<TAsyncStatus> inFlightRequests;
+ std::vector<TAsyncStatus> inFlightRequests;
- auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){
- std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches;
- std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque;
- size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(recordBatch);
+ while (true) {
+ std::shared_ptr<arrow::RecordBatch> batch;
- size_t sliceCnt = totalSize / (size_t)settings.BytesPerRequest_;
- if (totalSize % settings.BytesPerRequest_ != 0) {
- sliceCnt++;
+ st = reader->ReadNext(&batch);
+ if (!st.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
}
- int64_t rowsInSlice = recordBatch->num_rows() / sliceCnt;
-
- for (int64_t currentRow = 0; currentRow < recordBatch->num_rows(); currentRow += rowsInSlice) {
- auto nextSlice = (currentRow + rowsInSlice < recordBatch->num_rows()) ? recordBatch->Slice(currentRow, rowsInSlice) : recordBatch->Slice(currentRow);
- batchesDeque.push_back(nextSlice);
+ // The read function will return null at the end of data stream.
+ if (!batch) {
+ break;
}
- while (!batchesDeque.empty()) {
- std::shared_ptr<arrow::RecordBatch> nextBatch = batchesDeque.front();
- batchesDeque.pop_front();
- if (NYdb_cli::NArrow::GetBatchDataSize(nextBatch) < settings.BytesPerRequest_) {
- slicedRecordBatches.push_back(nextBatch);
- }
- else {
- std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2);
- std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2);
- batchesDeque.push_front(right);
- batchesDeque.push_front(left);
- }
- }
- auto schema = recordBatch->schema();
- TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema);
- for (size_t i = 0; i < slicedRecordBatches.size(); i++) {
- TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]);
- auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
- if (!status.IsSuccess()) {
- return status;
- }
+ const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema());
+ const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch);
+ const size_t sliceCount =
+ (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0);
+ const i64 rowsInSlice = batch->num_rows() / sliceCount;
- inFlightRequests.push_back(UpsertParquetBuffer(dbPath, buffer, strSchema));
- }
+ for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) {
+ std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSend;
- return MakeStatus(EStatus::SUCCESS);
- };
+ if (currentRow + rowsInSlice < batch->num_rows()) {
+ rowsToSend.push(batch->Slice(currentRow, rowsInSlice));
+ } else {
+ rowsToSend.push(batch->Slice(currentRow));
+ }
- std::shared_ptr<arrow::RecordBatch> currentBatch;
- st = reader->ReadNext(&currentBatch);
- if (!st.ok()) {
- return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
- }
+ do {
+ const auto rows = rowsToSend.top();
- while(currentBatch) {
- TStatus upsertStatus = splitUpsertBatch(currentBatch);
- if (!upsertStatus.IsSuccess()) {
- return upsertStatus;
- }
- st = reader->ReadNext(&currentBatch);
- if (!st.ok()) {
- return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
+ rowsToSend.pop();
+ // Nothing to send. Continue.
+ if (rows->num_rows() == 0) {
+ continue;
+ }
+ // Logarithmic approach to find number of rows fit into the byte limit.
+ if (rows->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rows) < settings.BytesPerRequest_) {
+ // Single row or fits into the byte limit.
+ auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
+ if (!status.IsSuccess()) {
+ return status;
+ }
+
+ inFlightRequests.push_back(UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rows), strSchema));
+ } else {
+ // Split current slice.
+ rowsToSend.push(rows->Slice(rows->num_rows() / 2));
+ rowsToSend.push(rows->Slice(0, rows->num_rows() / 2));
+ }
+ } while (!rowsToSend.empty());
}
}
- return WaitForQueue(inFlightRequests, 0);
- #endif
+ return WaitForQueue(0, inFlightRequests);
+#endif
}
inline