diff options
author | Maksim Kita <kitaetoya@gmail.com> | 2023-07-11 14:07:13 +0000 |
---|---|---|
committer | maksim-kita <maksim-kita@yandex-team.com> | 2023-07-11 17:07:13 +0300 |
commit | 341dec40451fa14efd03d41ae055e96d8490761d (patch) | |
tree | 85973cf3772ef943fe419e3b80b9a2a3d467e51e | |
parent | b82748c710edb310a5171647a19cbdc9055f70a6 (diff) | |
download | ydb-341dec40451fa14efd03d41ae055e96d8490761d.tar.gz |
Datashard added OwnedCellVecBatch
Datashard added OwnedCellVecBatch
Pull Request resolved: #304
-rw-r--r-- | ydb/core/scheme/scheme_tablecell.cpp | 48 | ||||
-rw-r--r-- | ydb/core/scheme/scheme_tablecell.h | 65 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 15 |
5 files changed, 148 insertions, 7 deletions
diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp index 04c2aaf4a6..3d30ca1d80 100644 --- a/ydb/core/scheme/scheme_tablecell.cpp +++ b/ydb/core/scheme/scheme_tablecell.cpp @@ -197,6 +197,54 @@ bool TSerializedCellVec::DoTryParse(const TString& data) { return TryDeserializeCellVec(data, Buf, Cells); } +TOwnedCellVecBatch::TOwnedCellVecBatch() + : Pool(std::make_unique<TMemoryPool>(InitialPoolSize)) { +} + +size_t TOwnedCellVecBatch::Append(TConstArrayRef<TCell> cells) { + size_t cellsSize = cells.size(); + if (cellsSize == 0) { + CellVectors.emplace_back(); + return 0; + } + + size_t size = sizeof(TCell) * cellsSize; + for (auto& cell : cells) { + if (!cell.IsNull() && !cell.IsInline()) { + const size_t cellSize = cell.Size(); + size += AlignUp(cellSize); + } + } + + char * allocatedBuffer = reinterpret_cast<char *>(Pool->Allocate(size)); + + TCell* ptrCell = reinterpret_cast<TCell*>(allocatedBuffer); + char* ptrData = reinterpret_cast<char*>(ptrCell + cellsSize); + + TConstArrayRef<TCell> cellvec(ptrCell, ptrCell + cellsSize); + + for (auto& cell : cells) { + if (cell.IsNull()) { + new (ptrCell) TCell(); + } else if (cell.IsInline()) { + new (ptrCell) TCell(cell); + } else { + const size_t cellSize = cell.Size(); + if (Y_LIKELY(cellSize > 0)) { + ::memcpy(ptrData, cell.Data(), cellSize); + } + new (ptrCell) TCell(ptrData, cellSize); + ptrData += AlignUp(cellSize); + } + + ++ptrCell; + } + + CellVectors.push_back(cellvec); + return size; +} + + TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry ®) { auto typeId = typeInfo.GetTypeId(); TString res; diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index e6d1a62479..de70c17afa 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -8,6 +8,7 @@ #include <util/generic/bitops.h> #include <util/generic/hash.h> #include <util/system/unaligned_mem.h> +#include <util/memory/pool.h> #include <type_traits> @@ -499,6 +500,70 @@ private: TVector<TCell> Cells; }; +class TOwnedCellVecBatch { +public: + TOwnedCellVecBatch(); + + TOwnedCellVecBatch(const TOwnedCellVecBatch& rhs) = delete; + + TOwnedCellVecBatch & operator=(const TOwnedCellVecBatch& rhs) = delete; + + TOwnedCellVecBatch(const TOwnedCellVecBatch&& rhs) = default; + + TOwnedCellVecBatch & operator=(TOwnedCellVecBatch&& rhs) = default; + + size_t Append(TConstArrayRef<TCell> cells); + + size_t Size() const { + return CellVectors.size(); + } + + bool Empty() const { + return CellVectors.empty(); + } + + bool empty() const { + return CellVectors.empty(); + } + + using iterator = TVector<TConstArrayRef<TCell>>::iterator; + using const_iterator = TVector<TConstArrayRef<TCell>>::const_iterator; + + iterator begin() { + return CellVectors.begin(); + } + + iterator end() { + return CellVectors.end(); + } + + const_iterator cbegin() { + return CellVectors.cbegin(); + } + + const_iterator cend() { + return CellVectors.cend(); + } + + TConstArrayRef<TCell> operator[](size_t index) const { + return CellVectors[index]; + } + + TConstArrayRef<TCell> front() const { + return CellVectors.front(); + } + + TConstArrayRef<TCell> back() const { + return CellVectors.back(); + } + +private: + static constexpr size_t InitialPoolSize = 1ULL << 16; + + std::unique_ptr<TMemoryPool> Pool; + TVector<TConstArrayRef<TCell>> CellVectors; +}; + void DbgPrintValue(TString&, const TCell&, NScheme::TTypeInfo typeInfo); TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry& typeRegistry); TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& typeRegistry); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 960457cb2a..94be571196 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -4192,7 +4192,19 @@ void TEvDataShard::TEvReadResult::FillRecord() { protoBatch->SetBatch(NArrow::SerializeBatchNoCompression(ArrowBatch)); ArrowBatch.reset(); return; - } else if (!Rows.empty()) { + } + + if (!Batch.empty()) { + auto* protoBatch = Record.MutableCellVec(); + protoBatch->MutableRows()->Reserve(Batch.Size()); + for (const auto& row: Batch) { + protoBatch->AddRows(TSerializedCellVec::Serialize(row)); + } + Batch = {}; + return; + } + + if (!Rows.empty()) { auto* protoBatch = Record.MutableCellVec(); protoBatch->MutableRows()->Reserve(Rows.size()); for (const auto& row: Rows) { diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index ddbcc163bc..ae17c3af96 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -961,13 +961,17 @@ struct TEvDataShard { // CellVec (TODO: add schema?) TConstArrayRef<TCell> GetCells(size_t row) const { - if (Rows.empty() && RowsSerialized.empty() && Record.GetRowCount()) + if (Rows.empty() && Batch.Empty() && RowsSerialized.empty()) return {}; if (!Rows.empty()) { return Rows[row]; } + if (!Batch.Empty()) { + return Batch[row]; + } + return RowsSerialized[row].GetCells(); } @@ -975,6 +979,10 @@ struct TEvDataShard { Rows = std::move(rows); } + void SetBatch(TOwnedCellVecBatch&& batch) { + Batch = std::move(batch); + } + // Arrow void SetArrowBatch(std::shared_ptr<arrow::RecordBatch>&& batch) { @@ -988,6 +996,9 @@ struct TEvDataShard { // for local events TVector<TOwnedCellVec> Rows; + // batch for local events + TOwnedCellVecBatch Batch; + // for remote events to avoid extra copying TVector<TSerializedCellVec> RowsSerialized; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 018100b19a..29b502d665 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -106,8 +106,8 @@ public: void AddRow(const TDbTupleRef& key, const TDbTupleRef& value) override { Y_UNUSED(key); - Rows.emplace_back(value.Cells()); - BytesCount += Rows.back().DataSize(); + size_t DataSize = Batch.Append(value.Cells()); + BytesCount += DataSize; } TString Finish() override { @@ -117,12 +117,15 @@ public: size_t Bytes() const override { return BytesCount; } public: - TVector<TOwnedCellVec> FlushBatch() { return std::move(Rows); } + void FlushBatch(TOwnedCellVecBatch & result) { + result = std::move(Batch); + Batch = {}; + } private: std::vector<std::pair<TString, NScheme::TTypeInfo>> Columns; - TVector<TOwnedCellVec> Rows; + TOwnedCellVecBatch Batch; ui64 BytesCount = 0; std::unique_ptr<IBlockBuilder> Clone() const override { @@ -665,7 +668,9 @@ public: } case NKikimrTxDataShard::CELLVEC: { auto& cellBuilder = static_cast<TCellBlockBuilder&>(BlockBuilder); - result.SetRows(cellBuilder.FlushBatch()); + TOwnedCellVecBatch batch; + cellBuilder.FlushBatch(batch); + result.SetBatch(std::move(batch)); break; } default: { |