diff options
author | Alexander Gololobov <agololobov@gmail.com> | 2022-02-08 03:35:08 +0300 |
---|---|---|
committer | Alexander Gololobov <agololobov@gmail.com> | 2022-02-08 03:35:08 +0300 |
commit | edc4877dd750ef38a3671975d841120748759e3f (patch) | |
tree | b3bb38b72a813f0a6f3d8a1823751baa53c556b0 | |
parent | 732338a75259cf91db5877d3567d1286a15fe623 (diff) | |
download | ydb-edc4877dd750ef38a3671975d841120748759e3f.tar.gz |
Extracted common code into AppendCell method (KIKIMR-13971)
ref:40cdd141f53a5748d6031d9d23dfb2faf659a35f
-rw-r--r-- | ydb/core/formats/arrow_batch_builder.cpp | 23 | ||||
-rw-r--r-- | ydb/core/formats/arrow_batch_builder.h | 10 |
2 files changed, 23 insertions, 10 deletions
diff --git a/ydb/core/formats/arrow_batch_builder.cpp b/ydb/core/formats/arrow_batch_builder.cpp index efa1b09435..8ecf97b9c7 100644 --- a/ydb/core/formats/arrow_batch_builder.cpp +++ b/ydb/core/formats/arrow_batch_builder.cpp @@ -87,16 +87,23 @@ bool TArrowBatchBuilder::Start(const TVector<std::pair<TString, NScheme::TTypeId return status.ok(); } +void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) { + NumBytes += cell.Size(); + const ui32 ydbType = YdbSchema[colNum].second; + auto status = NKikimr::NArrow::AppendCell(*BatchBuilder, cell, colNum, ydbType); + Y_VERIFY(status.ok()); +} + void TArrowBatchBuilder::AddRow(const TDbTupleRef& key, const TDbTupleRef& value) { ++NumRows; auto fnAppendTuple = [&] (const TDbTupleRef& tuple, size_t offsetInRow) { for (size_t i = 0; i < tuple.ColumnCount; ++i) { - ui32 ydbType = tuple.Types[i]; + const ui32 ydbType = tuple.Types[i]; + const ui32 colNum = offsetInRow + i; + Y_VERIFY(ydbType == YdbSchema[colNum].second); auto& cell = tuple.Columns[i]; - auto status = AppendCell(*BatchBuilder, cell, offsetInRow + i, ydbType); - Y_VERIFY(status.ok()); - NumBytes += cell.Size(); + AppendCell(cell, colNum); } }; @@ -110,15 +117,11 @@ void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& key, const TConstAr size_t offset = 0; for (size_t i = 0; i < key.size(); ++i, ++offset) { auto& cell = key[i]; - auto status = AppendCell(*BatchBuilder, cell, offset, YdbSchema[offset].second); - Y_VERIFY(status.ok()); - NumBytes += cell.Size(); + AppendCell(cell, offset); } for (size_t i = 0; i < value.size(); ++i, ++offset) { auto& cell = value[i]; - auto status = AppendCell(*BatchBuilder, cell, offset, YdbSchema[offset].second); - Y_VERIFY(status.ok()); - NumBytes += cell.Size(); + AppendCell(cell, offset); } } diff --git a/ydb/core/formats/arrow_batch_builder.h b/ydb/core/formats/arrow_batch_builder.h index 2071e95e8b..d52a94ed20 100644 --- a/ydb/core/formats/arrow_batch_builder.h +++ b/ydb/core/formats/arrow_batch_builder.h @@ -42,15 +42,25 @@ public: std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize); std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; } +protected: + void AppendCell(const TCell& cell, ui32 colNum); + + const TVector<std::pair<TString, NScheme::TTypeId>>& GetYdbSchema() const { + return YdbSchema; + } + private: arrow::ipc::IpcWriteOptions WriteOptions; TVector<std::pair<TString, NScheme::TTypeId>> YdbSchema; std::unique_ptr<arrow::RecordBatchBuilder> BatchBuilder; std::shared_ptr<arrow::RecordBatch> Batch; size_t RowsToReserve{DEFAULT_ROWS_TO_RESERVE}; + +protected: size_t NumRows{0}; size_t NumBytes{0}; +private: std::unique_ptr<IBlockBuilder> Clone() const override { return std::make_unique<TArrowBatchBuilder>(); } |