aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-01-21 18:38:12 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-01-21 19:01:20 +0300
commit3cfd089f583b5c95b28aa87ed10dc684c7e15c85 (patch)
treef586a6edd5f913bf1acad6f8720f891182e6383a
parent30172fb4fc34125a18c2a66d05971c77e8d6e453 (diff)
downloadydb-3cfd089f583b5c95b28aa87ed10dc684c7e15c85.tar.gz
Block map join memory optimizations
* Split block index into separate storage and index parts * BlockMapJoinCore index memory reservation * Use custom mkql allocator for the index table in BlockMapJoinCore commit_hash:aaf0c5c68925a61b602f0c39e56cce67d8be17c2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp385
-rw-r--r--yql/essentials/minikql/mkql_alloc.h34
2 files changed, 252 insertions, 167 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 189f8a0d2df..fad21299c81 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -239,24 +239,147 @@ private:
TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
};
-class TBlockIndex : public TComputationValue<TBlockIndex> {
- struct TIndexEntry {
+template <typename TDerived>
+class TBlockStorageBase : public TComputationValue<TDerived> {
+ using TBase = TComputationValue<TDerived>;
+
+public:
+ struct TBlock {
+ size_t Size;
+ std::vector<arrow::Datum> Columns;
+
+ TBlock() = default;
+ TBlock(size_t size, std::vector<arrow::Datum> columns)
+ : Size(size)
+ , Columns(std::move(columns))
+ {}
+ };
+
+ struct TRowEntry {
ui32 BlockOffset;
ui32 ItemOffset;
- TIndexEntry() = default;
- TIndexEntry(ui32 blockOffset, ui32 itemOffset)
+ TRowEntry() = default;
+ TRowEntry(ui32 blockOffset, ui32 itemOffset)
: BlockOffset(blockOffset)
, ItemOffset(itemOffset)
{}
};
+ TBlockStorageBase(
+ TMemoryUsageInfo* memInfo,
+ const TVector<TType*>& itemTypes,
+ NUdf::TUnboxedValue stream,
+ arrow::MemoryPool* pool
+ )
+ : TBase(memInfo)
+ , InputsDescr_(ToValueDescr(itemTypes))
+ , Stream_(stream)
+ , Inputs_(itemTypes.size())
+ {
+ TBlockTypeHelper helper;
+ for (size_t i = 0; i < itemTypes.size(); i++) {
+ TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ Hashers_.push_back(helper.MakeHasher(blockItemType));
+ Comparators_.push_back(helper.MakeComparator(blockItemType));
+ Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
+ }
+ }
+
+ NUdf::EFetchStatus FetchStream() {
+ switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ case NUdf::EFetchStatus::Ok:
+ break;
+ }
+
+ std::vector<arrow::Datum> blockColumns;
+ for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+ auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+ if (datum.is_scalar()) {
+ blockColumns.push_back(datum);
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ blockColumns.push_back(Trimmers_[i]->Trim(datum.array()));
+ }
+ }
+
+ auto blockSize = ::GetBlockCount(Inputs_[Inputs_.size() - 1]);
+ Data_.emplace_back(blockSize, std::move(blockColumns));
+ RowCount_ += blockSize;
+
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ const TBlock& GetBlock(size_t blockOffset) const {
+ Y_ENSURE(blockOffset < GetBlockCount());
+ return Data_[blockOffset];
+ }
+
+ size_t GetBlockCount() const {
+ return Data_.size();
+ }
+
+ TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
+ Y_ENSURE(columnIdx < Inputs_.size() - 1);
+ return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
+ }
+
+ void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
+ Y_ENSURE(row.size() == ioMap.size());
+ for (size_t i = 0; i < row.size(); i++) {
+ row[i] = GetItem(entry, ioMap[i]);
+ }
+ }
+
+protected:
+ TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
+ Y_ENSURE(offset < block.Size);
+ const auto& datum = block.Columns[columnIdx];
+ if (datum.is_scalar()) {
+ return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[columnIdx]->GetItem(*datum.array(), offset);
+ }
+ }
+
+protected:
+ const std::vector<arrow::ValueDescr> InputsDescr_;
+
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+ TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+ TVector<IBlockTrimmer::TPtr> Trimmers_;
+
+ std::vector<TBlock> Data_;
+ size_t RowCount_ = 0;
+
+ NUdf::TUnboxedValue Stream_;
+ TUnboxedValueVector Inputs_;
+};
+
+class TBlockStorage: public TBlockStorageBase<TBlockStorage> {
+private:
+ using TBase = TBlockStorageBase<TBlockStorage>;
+public:
+ using TBase::TBase;
+};
+
+class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
+ using TBase = TBlockStorageBase<TIndexedBlockStorage>;
+
struct TIndexNode {
- TIndexEntry Entry;
+ TRowEntry Entry;
TIndexNode* Next;
TIndexNode() = delete;
- TIndexNode(TIndexEntry entry, TIndexNode* next = nullptr)
+ TIndexNode(TRowEntry entry, TIndexNode* next = nullptr)
: Entry(entry)
, Next(next)
{}
@@ -268,7 +391,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
: Raw(0)
{}
- TIndexMapValue(TIndexEntry entry) {
+ TIndexMapValue(TRowEntry entry) {
TIndexEntryUnion un;
un.Entry = entry;
@@ -289,7 +412,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
return EntryList;
}
- TIndexEntry GetEntry() const {
+ TRowEntry GetEntry() const {
Y_ENSURE(IsInplace());
TIndexEntryUnion un;
@@ -299,7 +422,7 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
private:
union TIndexEntryUnion {
- TIndexEntry Entry;
+ TRowEntry Entry;
ui64 Raw;
};
@@ -309,13 +432,12 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
};
};
- using TBase = TComputationValue<TBlockIndex>;
using TIndexMap = TRobinHoodHashFixedMap<
ui64,
TIndexMapValue,
std::equal_to<ui64>,
std::hash<ui64>,
- TMKQLAllocator<char>
+ TMKQLHugeAllocator<char>
>;
static_assert(sizeof(TIndexMapValue) == 8);
@@ -323,6 +445,8 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
public:
class TIterator {
+ friend class TIndexedBlockStorage;
+
enum class EIteratorType {
EMPTY,
INPLACE,
@@ -332,26 +456,6 @@ public:
public:
TIterator() = default;
- TIterator(const TBlockIndex* blockIndex)
- : Type_(EIteratorType::EMPTY)
- , BlockIndex_(blockIndex)
- {}
-
- TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
- : Type_(EIteratorType::INPLACE)
- , BlockIndex_(blockIndex)
- , Entry_(entry)
- , EntryConsumed_(false)
- , ItemsToLookup_(std::move(itemsToLookup))
- {}
-
- TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
- : Type_(EIteratorType::LIST)
- , BlockIndex_(blockIndex)
- , Node_(node)
- , ItemsToLookup_(std::move(itemsToLookup))
- {}
-
TIterator(const TIterator&) = delete;
TIterator& operator=(const TIterator&) = delete;
@@ -384,7 +488,7 @@ public:
return *this;
}
- TMaybe<TIndexEntry> Next() {
+ TMaybe<TRowEntry> Next() {
Y_ENSURE(IsValid());
switch (Type_) {
@@ -397,7 +501,7 @@ public:
}
EntryConsumed_ = true;
- return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+ return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing();
case EIteratorType::LIST:
for (; Node_ != nullptr; Node_ = Node_->Next) {
@@ -434,13 +538,34 @@ public:
}
private:
+ TIterator(const TIndexedBlockStorage* blockIndex)
+ : Type_(EIteratorType::EMPTY)
+ , BlockIndex_(blockIndex)
+ {}
+
+ TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::INPLACE)
+ , BlockIndex_(blockIndex)
+ , Entry_(entry)
+ , EntryConsumed_(false)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::LIST)
+ , BlockIndex_(blockIndex)
+ , Node_(node)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ private:
EIteratorType Type_;
- const TBlockIndex* BlockIndex_ = nullptr;
+ const TIndexedBlockStorage* BlockIndex_ = nullptr;
union {
TIndexNode* Node_;
struct {
- TIndexEntry Entry_;
+ TRowEntry Entry_;
bool EntryConsumed_;
};
};
@@ -449,7 +574,7 @@ public:
};
public:
- TBlockIndex(
+ TIndexedBlockStorage(
TMemoryUsageInfo* memInfo,
const TVector<TType*>& itemTypes,
const TVector<ui32>& keyColumns,
@@ -457,106 +582,75 @@ public:
bool any,
arrow::MemoryPool* pool
)
- : TBase(memInfo)
- , InputsDescr_(ToValueDescr(itemTypes))
+ : TBase(memInfo, itemTypes, stream, pool)
, KeyColumns_(keyColumns)
- , Stream_(stream)
- , Inputs_(itemTypes.size())
, Any_(any)
- {
- TBlockTypeHelper helper;
- for (size_t i = 0; i < itemTypes.size(); i++) {
- TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
- Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- Hashers_.push_back(helper.MakeHasher(blockItemType));
- Comparators_.push_back(helper.MakeComparator(blockItemType));
- Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
- }
- }
+ {}
NUdf::EFetchStatus FetchStream() {
- switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Finish:
- return NUdf::EFetchStatus::Finish;
- case NUdf::EFetchStatus::Ok:
- break;
- }
+ Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built");
+ return TBase::FetchStream();
+ }
- {
- std::vector<arrow::Datum> block;
- for (size_t i = 0; i < Inputs_.size() - 1; i++) {
- auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
- if (datum.is_scalar()) {
- block.push_back(datum);
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- block.push_back(Trimmers_[i]->Trim(datum.array()));
- }
- }
- Data_.push_back(std::move(block));
- }
+ void BuildIndex() {
+ Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_));
+ for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) {
+ const auto& block = GetBlock(blockOffset);
+ auto blockSize = block.Size;
+
+ std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
+ std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries;
+ std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
+ ui32 insertBatchLen = 0;
+
+ auto processInsertBatch = [&]() {
+ Index_->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
+ auto value = static_cast<TIndexMapValue*>(Index_->GetMutablePayload(iter));
+ if (isNew) {
+ // Store single entry inplace
+ *value = TIndexMapValue(insertBatchEntries[i]);
+ Index_->CheckGrow();
+ } else {
+ if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
+ return;
+ }
- const auto& block = Data_.back();
- auto blockOffset = Data_.size() - 1;
- auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
-
- std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
- std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
- std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
- ui32 insertBatchLen = 0;
-
- auto processInsertBatch = [&]() {
- Index_.BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
- auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
- if (isNew) {
- // Store single entry inplace
- *value = TIndexMapValue(insertBatchEntries[i]);
- Index_.CheckGrow();
- } else {
- if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
- return;
- }
+ // Store as list
+ if (value->IsInplace()) {
+ *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+ }
- // Store as list
- if (value->IsInplace()) {
- *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+ *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
}
+ });
+ };
- *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
+ Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
+ Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
+ for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
+ ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
+ if (!keyHash) {
+ continue;
}
- });
- };
- Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
- Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
- for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
- ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
- if (!keyHash) {
- continue;
- }
+ insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset);
+ insertBatch[insertBatchLen].ConstructKey(keyHash);
+ insertBatchLen++;
- insertBatchEntries[insertBatchLen] = TIndexEntry(blockOffset, itemOffset);
- insertBatch[insertBatchLen].ConstructKey(keyHash);
- insertBatchLen++;
+ if (insertBatchLen == PrefetchBatchSize) {
+ processInsertBatch();
+ insertBatchLen = 0;
+ }
+ }
- if (insertBatchLen == PrefetchBatchSize) {
+ if (insertBatchLen > 0) {
processInsertBatch();
- insertBatchLen = 0;
}
}
-
- if (insertBatchLen > 0) {
- processInsertBatch();
- }
-
- return NUdf::EFetchStatus::Ok;
}
template<typename TGetKey>
- void BatchLookup(size_t batchSize, std::array<TBlockIndex::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
+ void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
Y_ENSURE(batchSize <= PrefetchBatchSize);
std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
@@ -568,14 +662,14 @@ public:
itemsBatch[i] = items;
}
- Index_.BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
+ Index_->BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
if (!iter) {
// Empty iterator
iterators[i] = TIterator(this);
return;
}
- auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter));
+ auto value = static_cast<const TIndexMapValue*>(Index_->GetPayload(iter));
if (value->IsInplace()) {
iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
} else {
@@ -584,20 +678,7 @@ public:
});
}
- TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const {
- Y_ENSURE(entry.BlockOffset < Data_.size());
- Y_ENSURE(columnIdx < Inputs_.size() - 1);
- return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset);
- }
-
- void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
- Y_ENSURE(row.size() == ioMap.size());
- for (size_t i = 0; i < row.size(); i++) {
- row[i] = GetItem(entry, ioMap[i]);
- }
- }
-
- bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
Y_ENSURE(keyItems.size() == KeyColumns_.size());
for (size_t i = 0; i < KeyColumns_.size(); i++) {
auto indexItem = GetItem(entry, KeyColumns_[i]);
@@ -610,7 +691,7 @@ public:
}
private:
- ui64 GetKey(const std::vector<arrow::Datum>& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
ui64 keyHash = 0;
keyItems.clear();
for (ui32 keyColumn : KeyColumns_) {
@@ -627,17 +708,7 @@ private:
return keyHash;
}
- TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const {
- const auto& datum = block[columnIdx];
- if (datum.is_scalar()) {
- return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[columnIdx]->GetItem(*datum.array(), offset);
- }
- }
-
- TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
+ TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) {
return &IndexNodes_.emplace_back(entry, currentHead);
}
@@ -658,22 +729,11 @@ private:
}
private:
- const std::vector<arrow::ValueDescr> InputsDescr_;
const TVector<ui32>& KeyColumns_;
- TVector<std::unique_ptr<IBlockReader>> Readers_;
- TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
- TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
- TVector<IBlockTrimmer::TPtr> Trimmers_;
-
- std::vector<std::vector<arrow::Datum>> Data_;
-
- TIndexMap Index_;
+ std::unique_ptr<TIndexMap> Index_;
std::deque<TIndexNode> IndexNodes_;
- NUdf::TUnboxedValue Stream_;
- TUnboxedValueVector Inputs_;
-
const bool Any_;
};
@@ -682,7 +742,7 @@ class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCore
{
using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
using TJoinState = TBlockJoinState<RightRequired>;
-using TIndexState = TBlockIndex;
+using TIndexState = TIndexedBlockStorage;
public:
TBlockMapJoinCoreWraper(
TComputationMutables& mutables,
@@ -728,10 +788,8 @@ public:
std::move(joinState),
std::move(indexState),
std::move(LeftStream_->GetValue(ctx)),
- LeftItemTypes_,
LeftKeyColumns_,
std::move(RightStream_->GetValue(ctx)),
- RightItemTypes_,
RightKeyColumns_,
RightIOMap_
);
@@ -747,10 +805,8 @@ private:
NUdf::TUnboxedValue&& joinState,
NUdf::TUnboxedValue&& indexState,
NUdf::TUnboxedValue&& leftStream,
- const TVector<TType*>& leftTypes,
const TVector<ui32>& leftKeyColumns,
NUdf::TUnboxedValue&& rightStream,
- const TVector<TType*>& rightTypes,
const TVector<ui32>& rightKeyColumns,
const TVector<ui32>& rightIOMap
)
@@ -758,10 +814,8 @@ private:
, JoinState_(joinState)
, IndexState_(indexState)
, LeftStream_(leftStream)
- , LeftItemTypes_(leftTypes)
, LeftKeyColumns_(leftKeyColumns)
, RightStream_(rightStream)
- , RightItemTypes_(rightTypes)
, RightKeyColumns_(rightKeyColumns)
, RightIOMap_(rightIOMap)
, HolderFactory_(holderFactory)
@@ -781,6 +835,7 @@ private:
}
}
+ indexState.BuildIndex();
RightStreamConsumed_ = true;
}
@@ -882,11 +937,9 @@ private:
NUdf::TUnboxedValue IndexState_;
NUdf::TUnboxedValue LeftStream_;
- const TVector<TType*>& LeftItemTypes_;
const TVector<ui32>& LeftKeyColumns_;
NUdf::TUnboxedValue RightStream_;
- const TVector<TType*>& RightItemTypes_;
const TVector<ui32>& RightKeyColumns_;
const TVector<ui32>& RightIOMap_;
bool RightStreamConsumed_ = false;
diff --git a/yql/essentials/minikql/mkql_alloc.h b/yql/essentials/minikql/mkql_alloc.h
index cf20e066e98..24c18f41a8f 100644
--- a/yql/essentials/minikql/mkql_alloc.h
+++ b/yql/essentials/minikql/mkql_alloc.h
@@ -164,7 +164,7 @@ struct TMkqlPAllocHeader {
ui64 Self; // should be placed right before pointer to allocated area, see GetMemoryChunkContext
};
-static_assert(sizeof(TMkqlPAllocHeader) ==
+static_assert(sizeof(TMkqlPAllocHeader) ==
sizeof(size_t) +
sizeof(TAllocState::TListEntry) +
sizeof(void*), "Padding is not allowed");
@@ -497,6 +497,38 @@ struct TMKQLAllocator
using TWithDefaultMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Default>;
using TWithTemporaryMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Temporary>;
+template <typename Type>
+struct TMKQLHugeAllocator
+{
+ typedef Type value_type;
+ typedef Type* pointer;
+ typedef const Type* const_pointer;
+ typedef Type& reference;
+ typedef const Type& const_reference;
+ typedef size_t size_type;
+ typedef ptrdiff_t difference_type;
+
+ TMKQLHugeAllocator() noexcept = default;
+ ~TMKQLHugeAllocator() noexcept = default;
+
+ template<typename U> TMKQLHugeAllocator(const TMKQLHugeAllocator<U>&) noexcept {}
+ template<typename U> struct rebind { typedef TMKQLHugeAllocator<U> other; };
+ template<typename U> bool operator==(const TMKQLHugeAllocator<U>&) const { return true; }
+ template<typename U> bool operator!=(const TMKQLHugeAllocator<U>&) const { return false; }
+
+ static pointer allocate(size_type n, const void* = nullptr)
+ {
+ size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
+ return static_cast<pointer>(TlsAllocState->GetBlock(size));
+ }
+
+ static void deallocate(const_pointer p, size_type n) noexcept
+ {
+ size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
+ TlsAllocState->ReturnBlock(const_cast<pointer>(p), size);
+ }
+};
+
template <typename T>
class TPagedList
{