aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Gololobov <agololobov@gmail.com>2022-02-08 03:35:08 +0300
committerAlexander Gololobov <agololobov@gmail.com>2022-02-08 03:35:08 +0300
commitedc4877dd750ef38a3671975d841120748759e3f (patch)
treeb3bb38b72a813f0a6f3d8a1823751baa53c556b0
parent732338a75259cf91db5877d3567d1286a15fe623 (diff)
downloadydb-edc4877dd750ef38a3671975d841120748759e3f.tar.gz
Extracted common code into AppendCell method (KIKIMR-13971)
ref:40cdd141f53a5748d6031d9d23dfb2faf659a35f
-rw-r--r--ydb/core/formats/arrow_batch_builder.cpp23
-rw-r--r--ydb/core/formats/arrow_batch_builder.h10
2 files changed, 23 insertions, 10 deletions
diff --git a/ydb/core/formats/arrow_batch_builder.cpp b/ydb/core/formats/arrow_batch_builder.cpp
index efa1b09435..8ecf97b9c7 100644
--- a/ydb/core/formats/arrow_batch_builder.cpp
+++ b/ydb/core/formats/arrow_batch_builder.cpp
@@ -87,16 +87,23 @@ bool TArrowBatchBuilder::Start(const TVector<std::pair<TString, NScheme::TTypeId
return status.ok();
}
+void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) {
+ NumBytes += cell.Size();
+ const ui32 ydbType = YdbSchema[colNum].second;
+ auto status = NKikimr::NArrow::AppendCell(*BatchBuilder, cell, colNum, ydbType);
+ Y_VERIFY(status.ok());
+}
+
void TArrowBatchBuilder::AddRow(const TDbTupleRef& key, const TDbTupleRef& value) {
++NumRows;
auto fnAppendTuple = [&] (const TDbTupleRef& tuple, size_t offsetInRow) {
for (size_t i = 0; i < tuple.ColumnCount; ++i) {
- ui32 ydbType = tuple.Types[i];
+ const ui32 ydbType = tuple.Types[i];
+ const ui32 colNum = offsetInRow + i;
+ Y_VERIFY(ydbType == YdbSchema[colNum].second);
auto& cell = tuple.Columns[i];
- auto status = AppendCell(*BatchBuilder, cell, offsetInRow + i, ydbType);
- Y_VERIFY(status.ok());
- NumBytes += cell.Size();
+ AppendCell(cell, colNum);
}
};
@@ -110,15 +117,11 @@ void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& key, const TConstAr
size_t offset = 0;
for (size_t i = 0; i < key.size(); ++i, ++offset) {
auto& cell = key[i];
- auto status = AppendCell(*BatchBuilder, cell, offset, YdbSchema[offset].second);
- Y_VERIFY(status.ok());
- NumBytes += cell.Size();
+ AppendCell(cell, offset);
}
for (size_t i = 0; i < value.size(); ++i, ++offset) {
auto& cell = value[i];
- auto status = AppendCell(*BatchBuilder, cell, offset, YdbSchema[offset].second);
- Y_VERIFY(status.ok());
- NumBytes += cell.Size();
+ AppendCell(cell, offset);
}
}
diff --git a/ydb/core/formats/arrow_batch_builder.h b/ydb/core/formats/arrow_batch_builder.h
index 2071e95e8b..d52a94ed20 100644
--- a/ydb/core/formats/arrow_batch_builder.h
+++ b/ydb/core/formats/arrow_batch_builder.h
@@ -42,15 +42,25 @@ public:
std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize);
std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; }
+protected:
+ void AppendCell(const TCell& cell, ui32 colNum);
+
+ const TVector<std::pair<TString, NScheme::TTypeId>>& GetYdbSchema() const {
+ return YdbSchema;
+ }
+
private:
arrow::ipc::IpcWriteOptions WriteOptions;
TVector<std::pair<TString, NScheme::TTypeId>> YdbSchema;
std::unique_ptr<arrow::RecordBatchBuilder> BatchBuilder;
std::shared_ptr<arrow::RecordBatch> Batch;
size_t RowsToReserve{DEFAULT_ROWS_TO_RESERVE};
+
+protected:
size_t NumRows{0};
size_t NumBytes{0};
+private:
std::unique_ptr<IBlockBuilder> Clone() const override {
return std::make_unique<TArrowBatchBuilder>();
}