diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 11:45:38 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-22 13:30:48 +0300 |
commit | 8608de22d1643bade64c5371f40eeb6209e409cf (patch) | |
tree | 34fa9a9a45ed95af0ad4240fb44a172b4ddfb605 | |
parent | 841d152d111ac193cf0e7766c0e34168c39972e0 (diff) | |
download | ydb-8608de22d1643bade64c5371f40eeb6209e409cf.tar.gz |
KIKIMR-20205: cache simple arrays and reuse ones
6 files changed, 91 insertions, 8 deletions
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index 6b35f8eead..8385112025 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -97,7 +97,7 @@ ui32 TRowSizeCalculator::GetRowBytesSize(const ui32 row) const { return result; } -ui64 GetArrayDataRawSize(const std::shared_ptr<arrow::ArrayData>& data) { +ui64 GetArrayMemorySize(const std::shared_ptr<arrow::ArrayData>& data) { if (!data) { return 0; } @@ -142,7 +142,7 @@ ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>& batch) { } ui64 bytes = 0; for (auto& column : batch->column_data()) { - bytes += GetArrayDataRawSize(column); + bytes += GetArrayMemorySize(column); } return bytes; } diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h index 259a93836a..2bf5120ed2 100644 --- a/ydb/core/formats/arrow/size_calcer.h +++ b/ydb/core/formats/arrow/size_calcer.h @@ -131,7 +131,8 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc // Return size in bytes including size of bitmap mask ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); // Return size in bytes including size of bitmap mask -ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>& batch); +ui64 GetArrayMemorySize(const std::shared_ptr<arrow::ArrayData>& data); +ui64 GetBatchMemorySize(const std::shared_ptr<arrow::RecordBatch>&batch); // Return size in bytes *not* including size of bitmap mask ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 8e44244413..8c435aae22 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -1,6 +1,8 @@ #include "portion_info.h" #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/formats/arrow/arrow_filter.h> +#include <util/system/tls.h> +#include <ydb/core/formats/arrow/size_calcer.h> namespace NKikimr::NOlap { @@ -218,22 +220,74 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c return (*res)->column(0); } +namespace { + +class TThreadSimpleArraysCache { +private: + THashMap<TString, std::shared_ptr<arrow::Array>> Arrays; + const ui64 MaxOneArrayMemorySize = 10 * 1024 * 1024; + + template <class TInitializeActor> + std::shared_ptr<arrow::Array> InitializePosition(const TString& key, const ui32 recordsCount, const TInitializeActor actor) { + auto it = Arrays.find(key); + if (it == Arrays.end() || it->second->length() < recordsCount) { + auto arrNew = actor(recordsCount); + if (NArrow::GetArrayMemorySize(arrNew->data()) < MaxOneArrayMemorySize) { + if (it == Arrays.end()) { + Arrays.emplace(key, arrNew); + } else { + it->second = arrNew; + } + } + return arrNew; + } else { + return it->second->Slice(0, recordsCount); + } + } + +public: + std::shared_ptr<arrow::Array> GetNull(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount) { + AFL_VERIFY(type); + AFL_VERIFY(recordsCount); + const TString key = type->ToString(); + const auto initializer = [type](const ui32 recordsCount) { + return NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(type, recordsCount)); + }; + return InitializePosition(type->ToString(), recordsCount, initializer); + } + std::shared_ptr<arrow::Array> GetConst(const std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Scalar>& scalar, const ui32 recordsCount) { + AFL_VERIFY(type); + AFL_VERIFY(scalar); + AFL_VERIFY(recordsCount); + AFL_VERIFY(scalar->type->id() == type->id())("scalar", scalar->type->ToString())("field", type->ToString()); + + const auto initializer = [scalar](const ui32 recordsCount) { + return NArrow::TStatusValidator::GetValid(arrow::MakeArrayFromScalar(*scalar, recordsCount)); + }; + return InitializePosition(type->ToString() + "::" + scalar->ToString(), recordsCount, initializer); + } +}; + +} + std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; std::vector<std::shared_ptr<arrow::Field>> fields; + static thread_local TThreadSimpleArraysCache simpleArraysCache; for (auto&& i : Columns) { if (!options.IsAcceptedColumn(i.GetColumnId())) { continue; } std::shared_ptr<arrow::Scalar> scalar; if (options.IsConstantColumn(i.GetColumnId(), scalar)) { + auto type = i.GetField()->type(); + std::shared_ptr<arrow::Array> arr; if (scalar) { - auto arr = NArrow::TStatusValidator::GetValid(arrow::MakeArrayFromScalar(*scalar, RowsCount)); - columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr)); + arr = simpleArraysCache.GetConst(type, scalar, RowsCount); } else { - auto arr = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(i.GetField()->type(), RowsCount)); - columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr)); + arr = simpleArraysCache.GetNull(type, RowsCount); } + columns.emplace_back(std::make_shared<arrow::ChunkedArray>(arr)); } else { columns.emplace_back(i.Assemble()); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index fdb8391793..2fe6bd85d9 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -461,7 +461,8 @@ public: std::vector<TPreparedColumn> Columns; std::shared_ptr<arrow::Schema> Schema; size_t RowsCount = 0; - + mutable THashMap<TString, std::shared_ptr<arrow::Array>> NullColumns; + mutable THashMap<TString, std::shared_ptr<arrow::Array>> ConstColumns; public: struct TAssembleOptions { std::optional<std::set<ui32>> IncludedColumnIds; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h index 58e44c3d09..8396d59ef7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -54,6 +54,13 @@ public: return Contains(*columnsSet); } + bool IsEqual(const std::shared_ptr<TColumnsSet>& columnsSet) const { + if (!columnsSet) { + return false; + } + return IsEqual(*columnsSet); + } + bool Contains(const TColumnsSet& columnsSet) const { for (auto&& i : columnsSet.ColumnIds) { if (!ColumnIds.contains(i)) { @@ -63,6 +70,22 @@ public: return true; } + bool IsEqual(const TColumnsSet& columnsSet) const { + if (columnsSet.GetColumnIds().size() != ColumnIds.size()) { + return false; + } + auto itA = ColumnIds.begin(); + auto itB = columnsSet.ColumnIds.begin(); + while (itA != ColumnIds.end()) { + if (*itA != *itB) { + return false; + } + ++itA; + ++itB; + } + return true; + } + TString DebugString() const; TColumnsSet operator+(const TColumnsSet& external) const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index cd7c86c71a..a0769dddc2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -26,6 +26,10 @@ private: std::shared_ptr<TColumnsSet> FFMinusEFPKColumns; bool TrivialEFFlag = false; public: + bool IsSpecColumnsOnly() const { + return FFColumns->IsEqual(SpecColumns); + } + ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive); const TReadMetadata::TConstPtr& GetReadMetadata() const { |