aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaksim Kita <kitaetoya@gmail.com>2023-07-11 14:07:13 +0000
committermaksim-kita <maksim-kita@yandex-team.com>2023-07-11 17:07:13 +0300
commit341dec40451fa14efd03d41ae055e96d8490761d (patch)
tree85973cf3772ef943fe419e3b80b9a2a3d467e51e
parentb82748c710edb310a5171647a19cbdc9055f70a6 (diff)
downloadydb-341dec40451fa14efd03d41ae055e96d8490761d.tar.gz
Datashard added OwnedCellVecBatch
Datashard added OwnedCellVecBatch Pull Request resolved: #304
-rw-r--r--ydb/core/scheme/scheme_tablecell.cpp48
-rw-r--r--ydb/core/scheme/scheme_tablecell.h65
-rw-r--r--ydb/core/tx/datashard/datashard.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard.h13
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp15
5 files changed, 148 insertions, 7 deletions
diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp
index 04c2aaf4a6..3d30ca1d80 100644
--- a/ydb/core/scheme/scheme_tablecell.cpp
+++ b/ydb/core/scheme/scheme_tablecell.cpp
@@ -197,6 +197,54 @@ bool TSerializedCellVec::DoTryParse(const TString& data) {
return TryDeserializeCellVec(data, Buf, Cells);
}
+TOwnedCellVecBatch::TOwnedCellVecBatch()
+ : Pool(std::make_unique<TMemoryPool>(InitialPoolSize)) {
+}
+
+size_t TOwnedCellVecBatch::Append(TConstArrayRef<TCell> cells) {
+ size_t cellsSize = cells.size();
+ if (cellsSize == 0) {
+ CellVectors.emplace_back();
+ return 0;
+ }
+
+ size_t size = sizeof(TCell) * cellsSize;
+ for (auto& cell : cells) {
+ if (!cell.IsNull() && !cell.IsInline()) {
+ const size_t cellSize = cell.Size();
+ size += AlignUp(cellSize);
+ }
+ }
+
+ char * allocatedBuffer = reinterpret_cast<char *>(Pool->Allocate(size));
+
+ TCell* ptrCell = reinterpret_cast<TCell*>(allocatedBuffer);
+ char* ptrData = reinterpret_cast<char*>(ptrCell + cellsSize);
+
+ TConstArrayRef<TCell> cellvec(ptrCell, ptrCell + cellsSize);
+
+ for (auto& cell : cells) {
+ if (cell.IsNull()) {
+ new (ptrCell) TCell();
+ } else if (cell.IsInline()) {
+ new (ptrCell) TCell(cell);
+ } else {
+ const size_t cellSize = cell.Size();
+ if (Y_LIKELY(cellSize > 0)) {
+ ::memcpy(ptrData, cell.Data(), cellSize);
+ }
+ new (ptrCell) TCell(ptrData, cellSize);
+ ptrData += AlignUp(cellSize);
+ }
+
+ ++ptrCell;
+ }
+
+ CellVectors.push_back(cellvec);
+ return size;
+}
+
+
TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry &reg) {
auto typeId = typeInfo.GetTypeId();
TString res;
diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h
index e6d1a62479..de70c17afa 100644
--- a/ydb/core/scheme/scheme_tablecell.h
+++ b/ydb/core/scheme/scheme_tablecell.h
@@ -8,6 +8,7 @@
#include <util/generic/bitops.h>
#include <util/generic/hash.h>
#include <util/system/unaligned_mem.h>
+#include <util/memory/pool.h>
#include <type_traits>
@@ -499,6 +500,70 @@ private:
TVector<TCell> Cells;
};
+class TOwnedCellVecBatch {
+public:
+ TOwnedCellVecBatch();
+
+ TOwnedCellVecBatch(const TOwnedCellVecBatch& rhs) = delete;
+
+ TOwnedCellVecBatch & operator=(const TOwnedCellVecBatch& rhs) = delete;
+
+ TOwnedCellVecBatch(const TOwnedCellVecBatch&& rhs) = default;
+
+ TOwnedCellVecBatch & operator=(TOwnedCellVecBatch&& rhs) = default;
+
+ size_t Append(TConstArrayRef<TCell> cells);
+
+ size_t Size() const {
+ return CellVectors.size();
+ }
+
+ bool Empty() const {
+ return CellVectors.empty();
+ }
+
+ bool empty() const {
+ return CellVectors.empty();
+ }
+
+ using iterator = TVector<TConstArrayRef<TCell>>::iterator;
+ using const_iterator = TVector<TConstArrayRef<TCell>>::const_iterator;
+
+ iterator begin() {
+ return CellVectors.begin();
+ }
+
+ iterator end() {
+ return CellVectors.end();
+ }
+
+ const_iterator cbegin() {
+ return CellVectors.cbegin();
+ }
+
+ const_iterator cend() {
+ return CellVectors.cend();
+ }
+
+ TConstArrayRef<TCell> operator[](size_t index) const {
+ return CellVectors[index];
+ }
+
+ TConstArrayRef<TCell> front() const {
+ return CellVectors.front();
+ }
+
+ TConstArrayRef<TCell> back() const {
+ return CellVectors.back();
+ }
+
+private:
+ static constexpr size_t InitialPoolSize = 1ULL << 16;
+
+ std::unique_ptr<TMemoryPool> Pool;
+ TVector<TConstArrayRef<TCell>> CellVectors;
+};
+
void DbgPrintValue(TString&, const TCell&, NScheme::TTypeInfo typeInfo);
TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry& typeRegistry);
TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& typeRegistry);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 960457cb2a..94be571196 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -4192,7 +4192,19 @@ void TEvDataShard::TEvReadResult::FillRecord() {
protoBatch->SetBatch(NArrow::SerializeBatchNoCompression(ArrowBatch));
ArrowBatch.reset();
return;
- } else if (!Rows.empty()) {
+ }
+
+ if (!Batch.empty()) {
+ auto* protoBatch = Record.MutableCellVec();
+ protoBatch->MutableRows()->Reserve(Batch.Size());
+ for (const auto& row: Batch) {
+ protoBatch->AddRows(TSerializedCellVec::Serialize(row));
+ }
+ Batch = {};
+ return;
+ }
+
+ if (!Rows.empty()) {
auto* protoBatch = Record.MutableCellVec();
protoBatch->MutableRows()->Reserve(Rows.size());
for (const auto& row: Rows) {
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index ddbcc163bc..ae17c3af96 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -961,13 +961,17 @@ struct TEvDataShard {
// CellVec (TODO: add schema?)
TConstArrayRef<TCell> GetCells(size_t row) const {
- if (Rows.empty() && RowsSerialized.empty() && Record.GetRowCount())
+ if (Rows.empty() && Batch.Empty() && RowsSerialized.empty())
return {};
if (!Rows.empty()) {
return Rows[row];
}
+ if (!Batch.Empty()) {
+ return Batch[row];
+ }
+
return RowsSerialized[row].GetCells();
}
@@ -975,6 +979,10 @@ struct TEvDataShard {
Rows = std::move(rows);
}
+ void SetBatch(TOwnedCellVecBatch&& batch) {
+ Batch = std::move(batch);
+ }
+
// Arrow
void SetArrowBatch(std::shared_ptr<arrow::RecordBatch>&& batch) {
@@ -988,6 +996,9 @@ struct TEvDataShard {
// for local events
TVector<TOwnedCellVec> Rows;
+ // batch for local events
+ TOwnedCellVecBatch Batch;
+
// for remote events to avoid extra copying
TVector<TSerializedCellVec> RowsSerialized;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 018100b19a..29b502d665 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -106,8 +106,8 @@ public:
void AddRow(const TDbTupleRef& key, const TDbTupleRef& value) override {
Y_UNUSED(key);
- Rows.emplace_back(value.Cells());
- BytesCount += Rows.back().DataSize();
+ size_t DataSize = Batch.Append(value.Cells());
+ BytesCount += DataSize;
}
TString Finish() override {
@@ -117,12 +117,15 @@ public:
size_t Bytes() const override { return BytesCount; }
public:
- TVector<TOwnedCellVec> FlushBatch() { return std::move(Rows); }
+ void FlushBatch(TOwnedCellVecBatch & result) {
+ result = std::move(Batch);
+ Batch = {};
+ }
private:
std::vector<std::pair<TString, NScheme::TTypeInfo>> Columns;
- TVector<TOwnedCellVec> Rows;
+ TOwnedCellVecBatch Batch;
ui64 BytesCount = 0;
std::unique_ptr<IBlockBuilder> Clone() const override {
@@ -665,7 +668,9 @@ public:
}
case NKikimrTxDataShard::CELLVEC: {
auto& cellBuilder = static_cast<TCellBlockBuilder&>(BlockBuilder);
- result.SetRows(cellBuilder.FlushBatch());
+ TOwnedCellVecBatch batch;
+ cellBuilder.FlushBatch(batch);
+ result.SetBatch(std::move(batch));
break;
}
default: {