aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 11:45:38 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-22 13:30:48 +0300
commit8608de22d1643bade64c5371f40eeb6209e409cf (patch)
tree34fa9a9a45ed95af0ad4240fb44a172b4ddfb605
parent841d152d111ac193cf0e7766c0e34168c39972e0 (diff)
downloadydb-8608de22d1643bade64c5371f40eeb6209e409cf.tar.gz
KIKIMR-20205: cache simple arrays and reuse ones
-rw-r--r--ydb/core/formats/arrow/size_calcer.cpp4
-rw-r--r--ydb/core/formats/arrow/size_calcer.h3
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp62
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h23
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.h4
6 files changed, 91 insertions, 8 deletions
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp
index 6b35f8eead..8385112025 100644
--- a/ydb/core/formats/arrow/size_calcer.cpp
+++ b/ydb/core/formats/arrow/size_calcer.cpp
@@ -97,7 +97,7 @@ ui32 TRowSizeCalculator::GetRowBytesSize(const ui32 row) const {
return result;
}
-ui64 GetArrayDataRawSize(const std::shared_ptr<arrow::ArrayData>& data) {
+ui64 GetArrayMemorySize(const std::shared_ptr<arrow::ArrayData>& data) {
if (!data) {
return 0;
}
@@ -142,7 +142,7 @@ ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>& batch) {
}
ui64 bytes = 0;
for (auto& column : batch->column_data()) {
- bytes += GetArrayDataRawSize(column);
+ bytes += GetArrayMemorySize(column);
}
return bytes;
}
diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h
index 259a93836a..2bf5120ed2 100644
--- a/ydb/core/formats/arrow/size_calcer.h
+++ b/ydb/core/formats/arrow/size_calcer.h
@@ -131,7 +131,8 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc
// Return size in bytes including size of bitmap mask
ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
// Return size in bytes including size of bitmap mask
-ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>& batch);
+ui64 GetArrayMemorySize(const std::shared_ptr<arrow::ArrayData>& data);
+ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>&batch);
// Return size in bytes *not* including size of bitmap mask
ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 8e44244413..8c435aae22 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -1,6 +1,8 @@
#include "portion_info.h"
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <util/system/tls.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
namespace NKikimr::NOlap {
@@ -218,22 +220,74 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c
return (*res)->column(0);
}
+namespace {
+
+class TThreadSimpleArraysCache {
+private:
+ THashMap<TString, std::shared_ptr<arrow::Array>> Arrays;
+ const ui64 MaxOneArrayMemorySize = 10 * 1024 * 1024;
+
+ template <class TInitializeActor>
+ std::shared_ptr<arrow::Array> InitializePosition(const TString& key, const ui32 recordsCount, const TInitializeActor actor) {
+ auto it = Arrays.find(key);
+ if (it == Arrays.end() || it->second->length() < recordsCount) {
+ auto arrNew = actor(recordsCount);
+ if (NArrow::GetArrayMemorySize(arrNew->data()) < MaxOneArrayMemorySize) {
+ if (it == Arrays.end()) {
+ Arrays.emplace(key, arrNew);
+ } else {
+ it->second = arrNew;
+ }
+ }
+ return arrNew;
+ } else {
+ return it->second->Slice(0, recordsCount);
+ }
+ }
+
+public:
+ std::shared_ptr<arrow::Array> GetNull(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount) {
+ AFL_VERIFY(type);
+ AFL_VERIFY(recordsCount);
+ const TString key = type->ToString();
+ const auto initializer = [type](const ui32 recordsCount) {
+ return NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(type, recordsCount));
+ };
+ return InitializePosition(type->ToString(), recordsCount, initializer);
+ }
+ std::shared_ptr<arrow::Array> GetConst(const std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Scalar>& scalar, const ui32 recordsCount) {
+ AFL_VERIFY(type);
+ AFL_VERIFY(scalar);
+ AFL_VERIFY(recordsCount);
+ AFL_VERIFY(scalar->type->id() == type->id())("scalar", scalar->type->ToString())("field", type->ToString());
+
+ const auto initializer = [scalar](const ui32 recordsCount) {
+ return NArrow::TStatusValidator::GetValid(arrow::MakeArrayFromScalar(*scalar, recordsCount));
+ };
+ return InitializePosition(type->ToString() + "::" + scalar->ToString(), recordsCount, initializer);
+ }
+};
+
+}
+
std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
+ static thread_local TThreadSimpleArraysCache simpleArraysCache;
for (auto&& i : Columns) {
if (!options.IsAcceptedColumn(i.GetColumnId())) {
continue;
}
std::shared_ptr<arrow::Scalar> scalar;
if (options.IsConstantColumn(i.GetColumnId(), scalar)) {
+ auto type = i.GetField()->type();
+ std::shared_ptr<arrow::Array> arr;
if (scalar) {
- auto arr = NArrow::TStatusValidator::GetValid(arrow::MakeArrayFromScalar(*scalar, RowsCount));
- columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr));
+ arr = simpleArraysCache.GetConst(type, scalar, RowsCount);
} else {
- auto arr = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(i.GetField()->type(), RowsCount));
- columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr));
+ arr = simpleArraysCache.GetNull(type, RowsCount);
}
+ columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr));
} else {
columns.emplace_back(i.Assemble());
}
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index fdb8391793..2fe6bd85d9 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -461,7 +461,8 @@ public:
std::vector<TPreparedColumn> Columns;
std::shared_ptr<arrow::Schema> Schema;
size_t RowsCount = 0;
-
+ mutable THashMap<TString, std::shared_ptr<arrow::Array>> NullColumns;
+ mutable THashMap<TString, std::shared_ptr<arrow::Array>> ConstColumns;
public:
struct TAssembleOptions {
std::optional<std::set<ui32>> IncludedColumnIds;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
index 58e44c3d09..8396d59ef7 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h
@@ -54,6 +54,13 @@ public:
return Contains(*columnsSet);
}
+ bool IsEqual(const std::shared_ptr<TColumnsSet>& columnsSet) const {
+ if (!columnsSet) {
+ return false;
+ }
+ return IsEqual(*columnsSet);
+ }
+
bool Contains(const TColumnsSet& columnsSet) const {
for (auto&& i : columnsSet.ColumnIds) {
if (!ColumnIds.contains(i)) {
@@ -63,6 +70,22 @@ public:
return true;
}
+ bool IsEqual(const TColumnsSet& columnsSet) const {
+ if (columnsSet.GetColumnIds().size() != ColumnIds.size()) {
+ return false;
+ }
+ auto itA = ColumnIds.begin();
+ auto itB = columnsSet.ColumnIds.begin();
+ while (itA != ColumnIds.end()) {
+ if (*itA != *itB) {
+ return false;
+ }
+ ++itA;
+ ++itB;
+ }
+ return true;
+ }
+
TString DebugString() const;
TColumnsSet operator+(const TColumnsSet& external) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
index cd7c86c71a..a0769dddc2 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
@@ -26,6 +26,10 @@ private:
std::shared_ptr<TColumnsSet> FFMinusEFPKColumns;
bool TrivialEFFlag = false;
public:
+ bool IsSpecColumnsOnly() const {
+ return FFColumns->IsEqual(SpecColumns);
+ }
+
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);
const TReadMetadata::TConstPtr& GetReadMetadata() const {