diff options
author | stanly <stanly@yandex-team.com> | 2023-03-27 17:06:50 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-03-27 17:06:50 +0300 |
commit | 79588836df9759f558cf12946c9907adc092ff24 (patch) | |
tree | 85468f553a2f46132ae8c5f3dc02a094ace5f9b8 | |
parent | 773aa8b4c48879485bc7cbaa6995d68a574530a1 (diff) | |
download | ydb-79588836df9759f558cf12946c9907adc092ff24.tar.gz |
optimize slicing on send of parquet data
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 203 |
1 files changed, 93 insertions, 110 deletions
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 69f85c9407a..c9d75669fee 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -2,39 +2,39 @@ #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> -#include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/api/protos/ydb_formats.pb.h> +#include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/lib/json_value/ydb_json_value.h> #include <ydb/public/lib/ydb_cli/common/recursive_list.h> #include <ydb/public/lib/ydb_cli/dump/util/util.h> +#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> +#include <library/cpp/string_utils/csv/csv.h> + +#include <util/folder/path.h> #include <util/generic/vector.h> #include <util/stream/file.h> #include <util/string/builder.h> -#include <util/folder/path.h> - -#include <deque> -#include <library/cpp/string_utils/csv/csv.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/api.h> -#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h> -#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/result.h> #include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h> -#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> +#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h> +#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h> + +#include <stack> namespace NYdb { namespace NConsoleClient { - namespace { -static inline +inline TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) { NYql::TIssues issues; if (error) { @@ -43,8 +43,30 @@ TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) { return TStatus(code, std::move(issues)); } +TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFlightRequests) { + while (!inFlightRequests.empty() && inFlightRequests.size() >= maxQueueSize) { + NThreading::WaitAny(inFlightRequests).Wait(); + ui32 delta = 0; + for (ui32 i = 0; i + delta < inFlightRequests.size();) { + if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) { + auto status = inFlightRequests[i].ExtractValueSync(); + if (!status.IsSuccess()) { + return status; + } + ++delta; + inFlightRequests[i] = inFlightRequests[inFlightRequests.size() - delta]; + } else { + ++i; + } + } + inFlightRequests.resize(inFlightRequests.size() - delta); + } + + return MakeStatus(); } +} // namespace + TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig) : OperationClient(std::make_shared<NOperation::TOperationClient>(driver)) , SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver)) @@ -109,36 +131,6 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath TStringBuilder() << "Unsupported format #" << (int) settings.Format_); } -namespace { - -TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueueSize) { - std::vector<TStatus> problemResults; - while (!inFlightRequests.empty() && inFlightRequests.size() > maxQueueSize && problemResults.empty()) { - NThreading::WaitAny(inFlightRequests).Wait(); - ui32 delta = 0; - for (ui32 i = 0; i + delta < inFlightRequests.size();) { - if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) { - auto status = inFlightRequests[i].ExtractValueSync(); - if (!status.IsSuccess()) { - problemResults.emplace_back(status); - } - ++delta; - inFlightRequests[i] = inFlightRequests[inFlightRequests.size() - delta]; - } else { - ++i; - } - } - inFlightRequests.resize(inFlightRequests.size() - delta); - } - if (problemResults.size()) { - return problemResults.front(); - } else { - return MakeStatus(); - } -} - -} - inline TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& buffer) { auto upsert = [this, dbPath, buffer](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { @@ -189,7 +181,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, UpsertSettings.FormatSettings(formatSettings); } - std::deque<TAsyncStatus> inFlightRequests; + std::vector<TAsyncStatus> inFlightRequests; ui32 idx = settings.SkipRows_; ui64 readSize = 0; @@ -205,7 +197,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; } if (buffer.Size() >= settings.BytesPerRequest_) { - auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); + auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests); if (!status.IsSuccess()) { return status; } @@ -219,7 +211,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); } - return WaitForQueue(inFlightRequests, 0); + return WaitForQueue(0, inFlightRequests); } inline @@ -250,7 +242,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode; - std::deque<TAsyncStatus> inFlightRequests; + std::vector<TAsyncStatus> inFlightRequests; size_t currentSize = 0; size_t currentRows = 0; @@ -266,7 +258,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath if (currentSize >= settings.BytesPerRequest_) { currentBatch->EndList(); - auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); + auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests); if (!status.IsSuccess()) { return status; } @@ -285,13 +277,15 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch)); } - return WaitForQueue(inFlightRequests, 0); + return WaitForQueue(0, inFlightRequests); } -TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) { - #if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__) - return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); - #else +TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename, + [[maybe_unused]] const TString& dbPath, + [[maybe_unused]] const TImportFileSettings& settings) { +#if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__) + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); +#else std::shared_ptr<arrow::io::ReadableFile> infile; arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename); if (!fileResult.ok()) { @@ -300,95 +294,84 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie(); std::unique_ptr<parquet::arrow::FileReader> fileReader; - arrow::MemoryPool *pool = arrow::default_memory_pool(); arrow::Status st; - st = parquet::arrow::OpenFile(readableFile, pool, &fileReader); + st = parquet::arrow::OpenFile(readableFile, arrow::default_memory_pool(), &fileReader); if (!st.ok()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString()); } - std::shared_ptr<parquet::FileMetaData> metaData = parquet::ReadMetaData(readableFile); - - i64 numRowGroups = metaData->num_row_groups(); + const i64 numRowGroups = parquet::ReadMetaData(readableFile)->num_row_groups(); std::vector<int> row_group_indices(numRowGroups); for (i64 i = 0; i < numRowGroups; i++) { row_group_indices[i] = i; } - std::shared_ptr<arrow::RecordBatchReader> reader; + std::unique_ptr<arrow::RecordBatchReader> reader; st = fileReader->GetRecordBatchReader(row_group_indices, &reader); if (!st.ok()) { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString()); } - std::deque<TAsyncStatus> inFlightRequests; + std::vector<TAsyncStatus> inFlightRequests; - auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){ - std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches; - std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque; - size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(recordBatch); + while (true) { + std::shared_ptr<arrow::RecordBatch> batch; - size_t sliceCnt = totalSize / (size_t)settings.BytesPerRequest_; - if (totalSize % settings.BytesPerRequest_ != 0) { - sliceCnt++; + st = reader->ReadNext(&batch); + if (!st.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); } - int64_t rowsInSlice = recordBatch->num_rows() / sliceCnt; - - for (int64_t currentRow = 0; currentRow < recordBatch->num_rows(); currentRow += rowsInSlice) { - auto nextSlice = (currentRow + rowsInSlice < recordBatch->num_rows()) ? recordBatch->Slice(currentRow, rowsInSlice) : recordBatch->Slice(currentRow); - batchesDeque.push_back(nextSlice); + // The read function will return null at the end of data stream. + if (!batch) { + break; } - while (!batchesDeque.empty()) { - std::shared_ptr<arrow::RecordBatch> nextBatch = batchesDeque.front(); - batchesDeque.pop_front(); - if (NYdb_cli::NArrow::GetBatchDataSize(nextBatch) < settings.BytesPerRequest_) { - slicedRecordBatches.push_back(nextBatch); - } - else { - std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2); - std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2); - batchesDeque.push_front(right); - batchesDeque.push_front(left); - } - } - auto schema = recordBatch->schema(); - TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema); - for (size_t i = 0; i < slicedRecordBatches.size(); i++) { - TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]); - auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); - if (!status.IsSuccess()) { - return status; - } + const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()); + const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch); + const size_t sliceCount = + (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0); + const i64 rowsInSlice = batch->num_rows() / sliceCount; - inFlightRequests.push_back(UpsertParquetBuffer(dbPath, buffer, strSchema)); - } + for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) { + std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSend; - return MakeStatus(EStatus::SUCCESS); - }; + if (currentRow + rowsInSlice < batch->num_rows()) { + rowsToSend.push(batch->Slice(currentRow, rowsInSlice)); + } else { + rowsToSend.push(batch->Slice(currentRow)); + } - std::shared_ptr<arrow::RecordBatch> currentBatch; - st = reader->ReadNext(¤tBatch); - if (!st.ok()) { - return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); - } + do { + const auto rows = rowsToSend.top(); - while(currentBatch) { - TStatus upsertStatus = splitUpsertBatch(currentBatch); - if (!upsertStatus.IsSuccess()) { - return upsertStatus; - } - st = reader->ReadNext(¤tBatch); - if (!st.ok()) { - return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); + rowsToSend.pop(); + // Nothing to send. Continue. + if (rows->num_rows() == 0) { + continue; + } + // Logarithmic approach to find number of rows fit into the byte limit. + if (rows->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rows) < settings.BytesPerRequest_) { + // Single row or fits into the byte limit. + auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests); + if (!status.IsSuccess()) { + return status; + } + + inFlightRequests.push_back(UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rows), strSchema)); + } else { + // Split current slice. + rowsToSend.push(rows->Slice(rows->num_rows() / 2)); + rowsToSend.push(rows->Slice(0, rows->num_rows() / 2)); + } + } while (!rowsToSend.empty()); } } - return WaitForQueue(inFlightRequests, 0); - #endif + return WaitForQueue(0, inFlightRequests); +#endif } inline |