diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-19 22:51:40 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-19 22:51:40 +0300 |
commit | 99cebf961f4e56be130135abfa4ab02169cc4dda (patch) | |
tree | bf79f67b7e33b7bea347740f192fc131f4924883 | |
parent | 47e31d4cfeab1251302eaae74eb9bba78ac7386d (diff) | |
download | ydb-99cebf961f4e56be130135abfa4ab02169cc4dda.tar.gz |
fix memory usage for non-compression case
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 3 | ||||
-rw-r--r-- | ydb/core/formats/arrow/serializer/batch_only.cpp | 15 | ||||
-rw-r--r-- | ydb/core/formats/arrow/serializer/full.cpp | 6 | ||||
-rw-r--r-- | ydb/core/formats/arrow/serializer/stream.h | 15 | ||||
-rw-r--r-- | ydb/core/io_formats/csv_arrow.cpp | 3 |
5 files changed, 22 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 09529be7410..769fa736002 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -5,6 +5,7 @@ #include "merging_sorted_input_stream.h" #include "serializer/batch_only.h" #include "serializer/abstract.h" +#include "serializer/stream.h" #include <ydb/core/util/yverify_stream.h> #include <util/system/yassert.h> @@ -133,7 +134,7 @@ TString SerializeSchema(const arrow::Schema& schema) { } std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) { - auto buffer = std::make_shared<arrow::Buffer>((const ui8*)str.data(), str.size()); + std::shared_ptr<arrow::Buffer> buffer(std::make_shared<NSerialization::TBufferOverString>(str)); arrow::io::BufferReader reader(buffer); arrow::ipc::DictionaryMemo dictMemo; auto schema = ReadSchema(&reader, &dictMemo); diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp index 85733ee09aa..0ad8768d0d2 100644 --- a/ydb/core/formats/arrow/serializer/batch_only.cpp +++ b/ydb/core/formats/arrow/serializer/batch_only.cpp @@ -10,21 +10,6 @@ #include <library/cpp/actors/core/log.h> namespace NKikimr::NArrow::NSerialization { -namespace { -// Arrow internally keeps references to Buffer objects with the data -// This helper class implements arrow::Buffer over TString that owns -// the actual memory -class TBufferOverString: public arrow::Buffer { - TString Str; -public: - explicit TBufferOverString(TString str) - : arrow::Buffer((const unsigned char*)str.data(), str.size()) - , Str(str) { - Y_VERIFY(data() == (const unsigned char*)Str.data()); - } -}; -} - arrow::Result<std::shared_ptr<arrow::RecordBatch>> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const { arrow::ipc::DictionaryMemo dictMemo; auto options = arrow::ipc::IpcReadOptions::Defaults(); diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp index c5424a856b0..84eff9227ab 100644 --- a/ydb/core/formats/arrow/serializer/full.cpp +++ b/ydb/core/formats/arrow/serializer/full.cpp @@ -14,9 +14,9 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> TFullDataDeserializer::DoDese auto options = arrow::ipc::IpcReadOptions::Defaults(); options.use_threads = false; - arrow::Buffer buffer((const ui8*)data.data(), data.size()); - arrow::io::BufferReader bufferReader(buffer); - auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&bufferReader)); + std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data)); + arrow::io::BufferReader readerStream(buffer); + auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream)); std::shared_ptr<arrow::RecordBatch> batch; auto readResult = reader->ReadNext(&batch); diff --git a/ydb/core/formats/arrow/serializer/stream.h b/ydb/core/formats/arrow/serializer/stream.h index 424e183f19a..3d429bcfd96 100644 --- a/ydb/core/formats/arrow/serializer/stream.h +++ b/ydb/core/formats/arrow/serializer/stream.h @@ -3,9 +3,24 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/status.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/result.h> #include <util/generic/string.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h> namespace NKikimr::NArrow::NSerialization { +// Arrow internally keeps references to Buffer objects with the data +// This helper class implements arrow::Buffer over TString that owns +// the actual memory +// Its use for no-compression mode, where RecordBatch dont own memory +class TBufferOverString: public arrow::Buffer { + TString Str; +public: + explicit TBufferOverString(TString str) + : arrow::Buffer((const unsigned char*)str.data(), str.size()) + , Str(str) { + Y_VERIFY(data() == (const unsigned char*)Str.data()); + } +}; + class TFixedStringOutputStream final: public arrow::io::OutputStream { public: TFixedStringOutputStream(TString* out) diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index a00eef8cde9..8834b026747 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -1,5 +1,6 @@ #include "csv.h" #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/serializer/stream.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h> @@ -162,7 +163,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - auto buffer = std::make_shared<arrow::Buffer>((const ui8*)csv.data(), csv.size()); + auto buffer = std::make_shared<NArrow::NSerialization::TBufferOverString>(csv); auto input = std::make_shared<arrow::io::BufferReader>(buffer); auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input, ReadOptions, ParseOptions, ConvertOptions); |