aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-19 22:51:40 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-19 22:51:40 +0300
commit99cebf961f4e56be130135abfa4ab02169cc4dda (patch)
treebf79f67b7e33b7bea347740f192fc131f4924883
parent47e31d4cfeab1251302eaae74eb9bba78ac7386d (diff)
downloadydb-99cebf961f4e56be130135abfa4ab02169cc4dda.tar.gz
fix memory usage for non-compression case
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp3
-rw-r--r--ydb/core/formats/arrow/serializer/batch_only.cpp15
-rw-r--r--ydb/core/formats/arrow/serializer/full.cpp6
-rw-r--r--ydb/core/formats/arrow/serializer/stream.h15
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp3
5 files changed, 22 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 09529be7410..769fa736002 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -5,6 +5,7 @@
#include "merging_sorted_input_stream.h"
#include "serializer/batch_only.h"
#include "serializer/abstract.h"
+#include "serializer/stream.h"
#include <ydb/core/util/yverify_stream.h>
#include <util/system/yassert.h>
@@ -133,7 +134,7 @@ TString SerializeSchema(const arrow::Schema& schema) {
}
std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
- auto buffer = std::make_shared<arrow::Buffer>((const ui8*)str.data(), str.size());
+ std::shared_ptr<arrow::Buffer> buffer(std::make_shared<NSerialization::TBufferOverString>(str));
arrow::io::BufferReader reader(buffer);
arrow::ipc::DictionaryMemo dictMemo;
auto schema = ReadSchema(&reader, &dictMemo);
diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp
index 85733ee09aa..0ad8768d0d2 100644
--- a/ydb/core/formats/arrow/serializer/batch_only.cpp
+++ b/ydb/core/formats/arrow/serializer/batch_only.cpp
@@ -10,21 +10,6 @@
#include <library/cpp/actors/core/log.h>
namespace NKikimr::NArrow::NSerialization {
-namespace {
-// Arrow internally keeps references to Buffer objects with the data
-// This helper class implements arrow::Buffer over TString that owns
-// the actual memory
-class TBufferOverString: public arrow::Buffer {
- TString Str;
-public:
- explicit TBufferOverString(TString str)
- : arrow::Buffer((const unsigned char*)str.data(), str.size())
- , Str(str) {
- Y_VERIFY(data() == (const unsigned char*)Str.data());
- }
-};
-}
-
arrow::Result<std::shared_ptr<arrow::RecordBatch>> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const {
arrow::ipc::DictionaryMemo dictMemo;
auto options = arrow::ipc::IpcReadOptions::Defaults();
diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp
index c5424a856b0..84eff9227ab 100644
--- a/ydb/core/formats/arrow/serializer/full.cpp
+++ b/ydb/core/formats/arrow/serializer/full.cpp
@@ -14,9 +14,9 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> TFullDataDeserializer::DoDese
auto options = arrow::ipc::IpcReadOptions::Defaults();
options.use_threads = false;
- arrow::Buffer buffer((const ui8*)data.data(), data.size());
- arrow::io::BufferReader bufferReader(buffer);
- auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&bufferReader));
+ std::shared_ptr<arrow::Buffer> buffer(std::make_shared<TBufferOverString>(data));
+ arrow::io::BufferReader readerStream(buffer);
+ auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream));
std::shared_ptr<arrow::RecordBatch> batch;
auto readResult = reader->ReadNext(&batch);
diff --git a/ydb/core/formats/arrow/serializer/stream.h b/ydb/core/formats/arrow/serializer/stream.h
index 424e183f19a..3d429bcfd96 100644
--- a/ydb/core/formats/arrow/serializer/stream.h
+++ b/ydb/core/formats/arrow/serializer/stream.h
@@ -3,9 +3,24 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/status.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
#include <util/generic/string.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
namespace NKikimr::NArrow::NSerialization {
+// Arrow internally keeps references to Buffer objects with the data
+// This helper class implements arrow::Buffer over TString that owns
+// the actual memory
+// Its use for no-compression mode, where RecordBatch dont own memory
+class TBufferOverString: public arrow::Buffer {
+ TString Str;
+public:
+ explicit TBufferOverString(TString str)
+ : arrow::Buffer((const unsigned char*)str.data(), str.size())
+ , Str(str) {
+ Y_VERIFY(data() == (const unsigned char*)Str.data());
+ }
+};
+
class TFixedStringOutputStream final: public arrow::io::OutputStream {
public:
TFixedStringOutputStream(TString* out)
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index a00eef8cde9..8834b026747 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -1,5 +1,6 @@
#include "csv.h"
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/serializer/stream.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>
@@ -162,7 +163,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return {};
}
- auto buffer = std::make_shared<arrow::Buffer>((const ui8*)csv.data(), csv.size());
+ auto buffer = std::make_shared<NArrow::NSerialization::TBufferOverString>(csv);
auto input = std::make_shared<arrow::io::BufferReader>(buffer);
auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input,
ReadOptions, ParseOptions, ConvertOptions);