aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-02-18 13:35:23 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-02-18 13:35:23 +0000
commitd1f5b91da822b27faad83d50ecfdd2830a1be93e (patch)
tree78df3bf535cb8a5451afa402c51cb3f8d11b4d06 /yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
parent22bc9b81495143d67a93bf58c936c5d5a65c8e8e (diff)
parenta2f16dc9eb108ecf11938c7c4275d701a3635bb7 (diff)
downloadydb-d1f5b91da822b27faad83d50ecfdd2830a1be93e.tar.gz
Merge pull request #14716 from ydb-platform/merge-libs-250218-0050
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp515
1 files changed, 327 insertions, 188 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 37217aec85..17c549202c 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -239,10 +239,8 @@ private:
TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
};
-template <typename TDerived>
-class TBlockStorageBase : public TComputationValue<TDerived> {
- using TSelf = TBlockStorageBase<TDerived>;
- using TBase = TComputationValue<TDerived>;
+class TBlockStorage : public TComputationValue<TBlockStorage> {
+ using TBase = TComputationValue<TBlockStorage>;
public:
struct TBlock {
@@ -268,7 +266,7 @@ public:
};
class TRowIterator {
- friend class TBlockStorageBase;
+ friend class TBlockStorage;
public:
TRowIterator() = default;
@@ -304,7 +302,7 @@ public:
}
private:
- TRowIterator(const TSelf* blockStorage)
+ TRowIterator(const TBlockStorage* blockStorage)
: BlockStorage_(blockStorage)
{}
@@ -312,19 +310,21 @@ public:
size_t CurrentBlockOffset_ = 0;
size_t CurrentItemOffset_ = 0;
- const TSelf* BlockStorage_ = nullptr;
+ const TBlockStorage* BlockStorage_ = nullptr;
};
- TBlockStorageBase(
+ TBlockStorage(
TMemoryUsageInfo* memInfo,
const TVector<TType*>& itemTypes,
NUdf::TUnboxedValue stream,
+ TStringBuf resourceTag,
arrow::MemoryPool* pool
)
: TBase(memInfo)
, InputsDescr_(ToValueDescr(itemTypes))
- , Stream_(stream)
+ , Stream_(std::move(stream))
, Inputs_(itemTypes.size())
+ , ResourceTag_(std::move(resourceTag))
{
TBlockTypeHelper helper;
for (size_t i = 0; i < itemTypes.size(); i++) {
@@ -341,11 +341,14 @@ public:
case NUdf::EFetchStatus::Yield:
return NUdf::EFetchStatus::Yield;
case NUdf::EFetchStatus::Finish:
+ IsFinished_ = true;
return NUdf::EFetchStatus::Finish;
case NUdf::EFetchStatus::Ok:
break;
}
+ Y_ENSURE(!IsFinished_, "Got data on finished stream");
+
std::vector<arrow::Datum> blockColumns;
for (size_t i = 0; i < Inputs_.size() - 1; i++) {
auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
@@ -384,19 +387,15 @@ public:
return TRowIterator(this);
}
+ size_t GetRowCount() const {
+ return RowCount_;
+ }
+
TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
Y_ENSURE(columnIdx < Inputs_.size() - 1);
return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
}
- void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
- Y_ENSURE(row.size() == ioMap.size());
- for (size_t i = 0; i < row.size(); i++) {
- row[i] = GetItem(entry, ioMap[i]);
- }
- }
-
-protected:
TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
Y_ENSURE(offset < block.Size);
const auto& datum = block.Columns[columnIdx];
@@ -408,6 +407,34 @@ protected:
}
}
+ void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
+ Y_ENSURE(row.size() == ioMap.size());
+ for (size_t i = 0; i < row.size(); i++) {
+ row[i] = GetItem(entry, ioMap[i]);
+ }
+ }
+
+ const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const {
+ return Comparators_;
+ }
+
+ const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const {
+ return Hashers_;
+ }
+
+ bool IsFinished() const {
+ return IsFinished_;
+ }
+
+private:
+ NUdf::TStringRef GetResourceTag() const override {
+ return NUdf::TStringRef(ResourceTag_);
+ }
+
+ void* GetResource() override {
+ return this;
+ }
+
protected:
const std::vector<arrow::ValueDescr> InputsDescr_;
@@ -418,27 +445,60 @@ protected:
std::vector<TBlock> Data_;
size_t RowCount_ = 0;
+ bool IsFinished_ = false;
NUdf::TUnboxedValue Stream_;
TUnboxedValueVector Inputs_;
+
+ const TStringBuf ResourceTag_;
};
-class TBlockStorage: public TBlockStorageBase<TBlockStorage> {
-private:
- using TBase = TBlockStorageBase<TBlockStorage>;
+class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> {
+ using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>;
+
public:
- using TBase::TBase;
+ TBlockStorageWrapper(
+ TComputationMutables& mutables,
+ TVector<TType*>&& itemTypes,
+ IComputationNode* stream,
+ const TStringBuf& resourceTag
+ )
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , ItemTypes_(std::move(itemTypes))
+ , Stream_(stream)
+ , ResourceTag_(resourceTag)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TBlockStorage>(
+ ItemTypes_,
+ std::move(Stream_->GetValue(ctx)),
+ ResourceTag_,
+ &ctx.ArrowMemoryPool
+ );
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(Stream_);
+ }
+
+private:
+ const TVector<TType*> ItemTypes_;
+ IComputationNode* const Stream_;
+
+ const TString ResourceTag_;
};
-class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
- using TBase = TBlockStorageBase<TIndexedBlockStorage>;
+class TBlockIndex : public TComputationValue<TBlockIndex> {
+ using TBase = TComputationValue<TBlockIndex>;
struct TIndexNode {
- TRowEntry Entry;
+ TBlockStorage::TRowEntry Entry;
TIndexNode* Next;
TIndexNode() = delete;
- TIndexNode(TRowEntry entry, TIndexNode* next = nullptr)
+ TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr)
: Entry(entry)
, Next(next)
{}
@@ -450,7 +510,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
: Raw(0)
{}
- TIndexMapValue(TRowEntry entry) {
+ TIndexMapValue(TBlockStorage::TRowEntry entry) {
TIndexEntryUnion un;
un.Entry = entry;
@@ -471,7 +531,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
return EntryList;
}
- TRowEntry GetEntry() const {
+ TBlockStorage::TRowEntry GetEntry() const {
Y_ENSURE(IsInplace());
TIndexEntryUnion un;
@@ -481,7 +541,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
private:
union TIndexEntryUnion {
- TRowEntry Entry;
+ TBlockStorage::TRowEntry Entry;
ui64 Raw;
};
@@ -504,7 +564,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> {
public:
class TIterator {
- friend class TIndexedBlockStorage;
+ friend class TBlockIndex;
enum class EIteratorType {
EMPTY,
@@ -547,7 +607,7 @@ public:
return *this;
}
- TMaybe<TRowEntry> Next() {
+ TMaybe<TBlockStorage::TRowEntry> Next() {
Y_ENSURE(IsValid());
switch (Type_) {
@@ -560,7 +620,7 @@ public:
}
EntryConsumed_ = true;
- return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing();
+ return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing();
case EIteratorType::LIST:
for (; Node_ != nullptr; Node_ = Node_->Next) {
@@ -597,12 +657,12 @@ public:
}
private:
- TIterator(const TIndexedBlockStorage* blockIndex)
+ TIterator(const TBlockIndex* blockIndex)
: Type_(EIteratorType::EMPTY)
, BlockIndex_(blockIndex)
{}
- TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::INPLACE)
, BlockIndex_(blockIndex)
, Entry_(entry)
@@ -610,7 +670,7 @@ public:
, ItemsToLookup_(std::move(itemsToLookup))
{}
- TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::LIST)
, BlockIndex_(blockIndex)
, Node_(node)
@@ -619,12 +679,12 @@ public:
private:
EIteratorType Type_;
- const TIndexedBlockStorage* BlockIndex_ = nullptr;
+ const TBlockIndex* BlockIndex_ = nullptr;
union {
TIndexNode* Node_;
struct {
- TRowEntry Entry_;
+ TBlockStorage::TRowEntry Entry_;
bool EntryConsumed_;
};
};
@@ -633,32 +693,35 @@ public:
};
public:
- TIndexedBlockStorage(
+ TBlockIndex(
TMemoryUsageInfo* memInfo,
- const TVector<TType*>& itemTypes,
const TVector<ui32>& keyColumns,
- NUdf::TUnboxedValue stream,
+ NUdf::TUnboxedValue blockStorage,
bool any,
- arrow::MemoryPool* pool
+ TStringBuf resourceTag
)
- : TBase(memInfo, itemTypes, stream, pool)
+ : TBase(memInfo)
, KeyColumns_(keyColumns)
+ , BlockStorage_(std::move(blockStorage))
, Any_(any)
+ , ResourceTag_(std::move(resourceTag))
{}
- NUdf::EFetchStatus FetchStream() {
- Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built");
- return TBase::FetchStream();
- }
-
void BuildIndex() {
- Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_));
- for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) {
- const auto& block = GetBlock(blockOffset);
+ if (Index_) {
+ return;
+ }
+
+ auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
+ Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read");
+
+ Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount()));
+ for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) {
+ const auto& block = blockStorage.GetBlock(blockOffset);
auto blockSize = block.Size;
std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
- std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries;
+ std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries;
std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
ui32 insertBatchLen = 0;
@@ -692,7 +755,7 @@ public:
continue;
}
- insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset);
+ insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset);
insertBatch[insertBatchLen].ConstructKey(keyHash);
insertBatchLen++;
@@ -709,7 +772,7 @@ public:
}
template<typename TGetKey>
- void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
+ void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
Y_ENSURE(batchSize <= PrefetchBatchSize);
std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
@@ -737,11 +800,13 @@ public:
});
}
- bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
+
Y_ENSURE(keyItems.size() == KeyColumns_.size());
for (size_t i = 0; i < KeyColumns_.size(); i++) {
- auto indexItem = GetItem(entry, KeyColumns_[i]);
- if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
+ auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]);
+ if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
return true;
}
}
@@ -749,25 +814,31 @@ public:
return false;
}
+ const NUdf::TUnboxedValue& GetBlockStorage() const {
+ return BlockStorage_;
+ }
+
private:
- ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
+ auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
+
ui64 keyHash = 0;
keyItems.clear();
for (ui32 keyColumn : KeyColumns_) {
- auto item = GetItemFromBlock(block, keyColumn, offset);
+ auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset);
if (!item) {
keyItems.clear();
return 0;
}
- keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
+ keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item));
keyItems.push_back(std::move(item));
}
return keyHash;
}
- TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) {
+ TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) {
return &IndexNodes_.emplace_back(entry, currentHead);
}
@@ -787,21 +858,72 @@ private:
}
}
+ NUdf::TStringRef GetResourceTag() const override {
+ return NUdf::TStringRef(ResourceTag_);
+ }
+
+ void* GetResource() override {
+ return this;
+ }
+
private:
const TVector<ui32>& KeyColumns_;
+ NUdf::TUnboxedValue BlockStorage_;
std::unique_ptr<TIndexMap> Index_;
std::deque<TIndexNode> IndexNodes_;
const bool Any_;
+ const TStringBuf ResourceTag_;
+};
+
+class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> {
+ using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>;
+
+public:
+ TBlockMapJoinIndexWrapper(
+ TComputationMutables& mutables,
+ TVector<ui32>&& keyColumns,
+ IComputationNode* blockStorage,
+ bool any,
+ const TStringBuf& resourceTag
+ )
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , KeyColumns_(std::move(keyColumns))
+ , BlockStorage_(blockStorage)
+ , Any_(any)
+ , ResourceTag_(resourceTag)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TBlockIndex>(
+ KeyColumns_,
+ std::move(BlockStorage_->GetValue(ctx)),
+ Any_,
+ ResourceTag_
+ );
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(BlockStorage_);
+ }
+
+private:
+ const TVector<ui32> KeyColumns_;
+ IComputationNode* const BlockStorage_;
+ const bool Any_;
+ const TString ResourceTag_;
};
-template <bool WithoutRight, bool RightRequired, bool RightAny>
-class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>
+template <bool WithoutRight, bool RightRequired>
+class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>
{
-using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
-using TJoinState = TBlockJoinState<RightRequired>;
-using TIndexState = TIndexedBlockStorage;
+ using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>;
+ using TJoinState = TBlockJoinState<RightRequired>;
+ using TStorageState = TBlockStorage;
+ using TIndexState = TBlockIndex;
+
public:
TBlockMapJoinCoreWraper(
TComputationMutables& mutables,
@@ -809,22 +931,18 @@ public:
const TVector<TType*>&& leftItemTypes,
const TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftIOMap,
- const TVector<TType*>&& rightItemTypes,
- const TVector<ui32>&& rightKeyColumns,
const TVector<ui32>&& rightIOMap,
IComputationNode* leftStream,
- IComputationNode* rightStream
+ IComputationNode* rightBlockIndex
)
: TBaseComputation(mutables, EValueRepresentation::Boxed)
, ResultItemTypes_(std::move(resultItemTypes))
, LeftItemTypes_(std::move(leftItemTypes))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(std::move(leftIOMap))
- , RightItemTypes_(std::move(rightItemTypes))
- , RightKeyColumns_(std::move(rightKeyColumns))
, RightIOMap_(std::move(rightIOMap))
- , LeftStream_(std::move(leftStream))
- , RightStream_(std::move(rightStream))
+ , LeftStream_(leftStream)
+ , RightBlockIndex_(rightBlockIndex)
, KeyTupleCache_(mutables)
{}
@@ -835,60 +953,49 @@ public:
LeftIOMap_,
ResultItemTypes_
);
- const auto indexState = ctx.HolderFactory.Create<TIndexState>(
- RightItemTypes_,
- RightKeyColumns_,
- std::move(RightStream_->GetValue(ctx)),
- RightAny,
- &ctx.ArrowMemoryPool
- );
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
std::move(joinState),
- std::move(indexState),
- std::move(LeftStream_->GetValue(ctx)),
LeftKeyColumns_,
- std::move(RightStream_->GetValue(ctx)),
- RightKeyColumns_,
- RightIOMap_
+ RightIOMap_,
+ std::move(LeftStream_->GetValue(ctx)),
+ std::move(RightBlockIndex_->GetValue(ctx))
);
}
private:
class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
+ using TBase = TComputationValue<TStreamValue>;
+
public:
TStreamValue(
TMemoryUsageInfo* memInfo,
const THolderFactory& holderFactory,
NUdf::TUnboxedValue&& joinState,
- NUdf::TUnboxedValue&& indexState,
- NUdf::TUnboxedValue&& leftStream,
const TVector<ui32>& leftKeyColumns,
- NUdf::TUnboxedValue&& rightStream,
- const TVector<ui32>& rightKeyColumns,
- const TVector<ui32>& rightIOMap
+ const TVector<ui32>& rightIOMap,
+ NUdf::TUnboxedValue&& leftStream,
+ NUdf::TUnboxedValue&& rightBlockIndex
)
: TBase(memInfo)
, JoinState_(joinState)
- , IndexState_(indexState)
- , LeftStream_(leftStream)
, LeftKeyColumns_(leftKeyColumns)
- , RightStream_(rightStream)
- , RightKeyColumns_(rightKeyColumns)
, RightIOMap_(rightIOMap)
+ , LeftStream_(leftStream)
+ , RightBlockIndex_(rightBlockIndex)
, HolderFactory_(holderFactory)
{}
private:
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get());
+ auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource());
+ auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource());
if (!RightStreamConsumed_) {
auto fetchStatus = NUdf::EFetchStatus::Ok;
while (fetchStatus != NUdf::EFetchStatus::Finish) {
- fetchStatus = indexState.FetchStream();
+ fetchStatus = storageState.FetchStream();
if (fetchStatus == NUdf::EFetchStatus::Yield) {
return NUdf::EFetchStatus::Yield;
}
@@ -931,7 +1038,7 @@ private:
while (joinState.IsNotFull() && !iter.IsEmpty()) {
auto key = iter.Next();
- indexState.GetRow(*key, RightIOMap_, rightRow);
+ storageState.GetRow(*key, RightIOMap_, rightRow);
joinState.MakeRow(rightRow);
}
@@ -993,13 +1100,9 @@ private:
}
NUdf::TUnboxedValue JoinState_;
- NUdf::TUnboxedValue IndexState_;
- NUdf::TUnboxedValue LeftStream_;
const TVector<ui32>& LeftKeyColumns_;
- NUdf::TUnboxedValue RightStream_;
- const TVector<ui32>& RightKeyColumns_;
const TVector<ui32>& RightIOMap_;
bool RightStreamConsumed_ = false;
@@ -1007,12 +1110,15 @@ private:
ui32 LookupBatchCurrent_ = 0;
ui32 LookupBatchSize_ = 0;
+ NUdf::TUnboxedValue LeftStream_;
+ NUdf::TUnboxedValue RightBlockIndex_;
+
const THolderFactory& HolderFactory_;
};
void RegisterDependencies() const final {
this->DependsOn(LeftStream_);
- this->DependsOn(RightStream_);
+ this->DependsOn(RightBlockIndex_);
}
private:
@@ -1022,44 +1128,37 @@ private:
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
- const TVector<TType*> RightItemTypes_;
- const TVector<ui32> RightKeyColumns_;
const TVector<ui32> RightIOMap_;
IComputationNode* const LeftStream_;
- IComputationNode* const RightStream_;
+ IComputationNode* const RightBlockIndex_;
const TContainerCacheOnContext KeyTupleCache_;
};
class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper>
{
-using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>;
-using TJoinState = TBlockJoinState<true>;
-using TStorageState = TBlockStorage;
+ using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>;
+ using TJoinState = TBlockJoinState<true>;
+ using TStorageState = TBlockStorage;
+
public:
TBlockCrossJoinCoreWraper(
TComputationMutables& mutables,
const TVector<TType*>&& resultItemTypes,
const TVector<TType*>&& leftItemTypes,
- const TVector<ui32>&& leftKeyColumns,
const TVector<ui32>&& leftIOMap,
- const TVector<TType*>&& rightItemTypes,
- const TVector<ui32>&& rightKeyColumns,
const TVector<ui32>&& rightIOMap,
IComputationNode* leftStream,
- IComputationNode* rightStream
+ IComputationNode* rightBlockStorage
)
: TBaseComputation(mutables, EValueRepresentation::Boxed)
, ResultItemTypes_(std::move(resultItemTypes))
, LeftItemTypes_(std::move(leftItemTypes))
- , LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(std::move(leftIOMap))
- , RightItemTypes_(std::move(rightItemTypes))
- , RightKeyColumns_(std::move(rightKeyColumns))
, RightIOMap_(std::move(rightIOMap))
, LeftStream_(std::move(leftStream))
- , RightStream_(std::move(rightStream))
+ , RightBlockStorage_(std::move(rightBlockStorage))
, KeyTupleCache_(mutables)
{}
@@ -1070,47 +1169,40 @@ public:
LeftIOMap_,
ResultItemTypes_
);
- const auto indexState = ctx.HolderFactory.Create<TStorageState>(
- RightItemTypes_,
- std::move(RightStream_->GetValue(ctx)),
- &ctx.ArrowMemoryPool
- );
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
std::move(joinState),
- std::move(indexState),
+ RightIOMap_,
std::move(LeftStream_->GetValue(ctx)),
- std::move(RightStream_->GetValue(ctx)),
- RightIOMap_
+ std::move(RightBlockStorage_->GetValue(ctx))
);
}
private:
class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
+ using TBase = TComputationValue<TStreamValue>;
+
public:
TStreamValue(
TMemoryUsageInfo* memInfo,
const THolderFactory& holderFactory,
NUdf::TUnboxedValue&& joinState,
- NUdf::TUnboxedValue&& storageState,
+ const TVector<ui32>& rightIOMap,
NUdf::TUnboxedValue&& leftStream,
- NUdf::TUnboxedValue&& rightStream,
- const TVector<ui32>& rightIOMap
+ NUdf::TUnboxedValue&& rightBlockStorage
)
: TBase(memInfo)
, JoinState_(joinState)
- , StorageState_(storageState)
- , LeftStream_(leftStream)
- , RightStream_(rightStream)
, RightIOMap_(rightIOMap)
+ , LeftStream_(leftStream)
+ , RightBlockStorage_(rightBlockStorage)
, HolderFactory_(holderFactory)
{}
private:
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- auto& storageState = *static_cast<TStorageState*>(StorageState_.AsBoxed().Get());
+ auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource());
if (!RightStreamConsumed_) {
auto fetchStatus = NUdf::EFetchStatus::Ok;
@@ -1176,43 +1268,109 @@ private:
}
NUdf::TUnboxedValue JoinState_;
- NUdf::TUnboxedValue StorageState_;
-
- NUdf::TUnboxedValue LeftStream_;
- NUdf::TUnboxedValue RightStream_;
const TVector<ui32>& RightIOMap_;
bool RightStreamConsumed_ = false;
TStorageState::TRowIterator RightRowIterator_;
+ NUdf::TUnboxedValue LeftStream_;
+ NUdf::TUnboxedValue RightBlockStorage_;
+
const THolderFactory& HolderFactory_;
};
void RegisterDependencies() const final {
this->DependsOn(LeftStream_);
- this->DependsOn(RightStream_);
+ this->DependsOn(RightBlockStorage_);
}
private:
const TVector<TType*> ResultItemTypes_;
const TVector<TType*> LeftItemTypes_;
- const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
- const TVector<TType*> RightItemTypes_;
- const TVector<ui32> RightKeyColumns_;
const TVector<ui32> RightIOMap_;
IComputationNode* const LeftStream_;
- IComputationNode* const RightStream_;
+ IComputationNode* const RightBlockStorage_;
const TContainerCacheOnContext KeyTupleCache_;
};
} // namespace
+IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
+
+ const auto resultType = callable.GetType()->GetReturnType();
+ MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
+ auto resultResourceType = AS_TYPE(TResourceType, resultType);
+ MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ const auto inputType = callable.GetInput(0).GetStaticType();
+ MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream");
+ const auto inputStreamType = AS_TYPE(TStreamType, inputType);
+ MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(),
+ "Expected Multi as a left stream item type");
+ const auto inputStreamComponents = GetWideComponents(inputStreamType);
+ MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column");
+ TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend());
+
+ const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TBlockStorageWrapper(
+ ctx.Mutables,
+ std::move(inputStreamItems),
+ inputStream,
+ resultResourceType->GetTag()
+ );
+}
+
+IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+
+ const auto resultType = callable.GetType()->GetReturnType();
+ MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
+ auto resultResourceType = AS_TYPE(TResourceType, resultType);
+ MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+
+ const auto inputType = callable.GetInput(0).GetStaticType();
+ MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type");
+ auto inputResourceType = AS_TYPE(TResourceType, inputType);
+ MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1));
+ MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type");
+ const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements();
+ MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column");
+
+ const auto keyColumnsLiteral = callable.GetInput(2);
+ const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
+ TVector<ui32> keyColumns;
+ keyColumns.reserve(keyColumnsTuple->GetValuesCount());
+ for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
+ keyColumns.emplace_back(item->AsValue().Get<ui32>());
+ }
+
+ for (ui32 keyColumn : keyColumns) {
+ MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range");
+ }
+
+ const auto anyNode = callable.GetInput(3);
+ const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>();
+
+ const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TBlockMapJoinIndexWrapper(
+ ctx.Mutables,
+ std::move(keyColumns),
+ blockStorage,
+ any,
+ resultResourceType->GetTag()
+ );
+}
+
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
@@ -1234,22 +1392,28 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
- const auto rightType = callable.GetInput(1).GetStaticType();
- MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream");
- const auto rightStreamType = AS_TYPE(TStreamType, rightType);
- MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(),
- "Expected Multi as a right stream item type");
- const auto rightStreamComponents = GetWideComponents(rightStreamType);
- MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
- const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
-
- const auto joinKindNode = callable.GetInput(2);
+ const auto joinKindNode = callable.GetInput(3);
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
const auto joinKind = GetJoinKind(rawKind);
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross);
- const auto leftKeyColumnsLiteral = callable.GetInput(3);
+ const auto rightBlockStorageType = callable.GetInput(1).GetStaticType();
+ MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type");
+ auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType);
+ if (joinKind != EJoinKind::Cross) {
+ MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+ } else {
+ MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+ }
+
+ auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2));
+ MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type");
+ const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements();
+ MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
+ const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
+
+ const auto leftKeyColumnsLiteral = callable.GetInput(4);
const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
TVector<ui32> leftKeyColumns;
leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
@@ -1259,7 +1423,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
}
const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
- const auto leftKeyDropsLiteral = callable.GetInput(4);
+ const auto leftKeyDropsLiteral = callable.GetInput(5);
const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
THashSet<ui32> leftKeyDrops;
leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
@@ -1273,7 +1437,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
"Only key columns has to be specified in drop column set");
}
- const auto rightKeyColumnsLiteral = callable.GetInput(5);
+ const auto rightKeyColumnsLiteral = callable.GetInput(6);
const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
TVector<ui32> rightKeyColumns;
rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
@@ -1283,7 +1447,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
}
const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
- const auto rightKeyDropsLiteral = callable.GetInput(6);
+ const auto rightKeyDropsLiteral = callable.GetInput(7);
const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
THashSet<ui32> rightKeyDrops;
rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
@@ -1303,9 +1467,6 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
}
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
- const auto rightAnyNode = callable.GetInput(7);
- const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>();
-
// XXX: Mind the last wide item, containing block length.
TVector<ui32> leftIOMap;
for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
@@ -1329,62 +1490,40 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
}
const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
- const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1);
+ const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1);
-#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \
- return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \
+#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \
+ return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \
ctx.Mutables, \
std::move(joinItems), \
std::move(leftStreamItems), \
std::move(leftKeyColumns), \
std::move(leftIOMap), \
- std::move(rightStreamItems), \
- std::move(rightKeyColumns), \
std::move(rightIOMap), \
leftStream, \
- rightStream \
+ rightBlockStorage \
)
switch (joinKind) {
case EJoinKind::Inner:
- if (rightAny) {
- JOIN_WRAPPER(false, true, true);
- } else {
- JOIN_WRAPPER(false, true, false);
- }
+ JOIN_WRAPPER(false, true);
case EJoinKind::Left:
- if (rightAny) {
- JOIN_WRAPPER(false, false, true);
- } else {
- JOIN_WRAPPER(false, false, false);
- }
+ JOIN_WRAPPER(false, false);
case EJoinKind::LeftSemi:
MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
- if (rightAny) {
- JOIN_WRAPPER(true, true, true);
- } else {
- JOIN_WRAPPER(true, true, false);
- }
+ JOIN_WRAPPER(true, true);
case EJoinKind::LeftOnly:
MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
- if (rightAny) {
- JOIN_WRAPPER(true, false, true);
- } else {
- JOIN_WRAPPER(true, false, false);
- }
+ JOIN_WRAPPER(true, false);
case EJoinKind::Cross:
- MKQL_ENSURE(!rightAny, "rightAny can't be used with cross join");
return new TBlockCrossJoinCoreWraper(
ctx.Mutables,
std::move(joinItems),
std::move(leftStreamItems),
- std::move(leftKeyColumns),
std::move(leftIOMap),
- std::move(rightStreamItems),
- std::move(rightKeyColumns),
std::move(rightIOMap),
leftStream,
- rightStream
+ rightBlockStorage
);
default:
/* TODO: Display the human-readable join kind name. */