diff options
author | Whompe <vladluk@ydb.tech> | 2024-10-16 15:32:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-16 14:32:25 +0200 |
commit | 4fc77b199a24e975406ad15a37d4af5e85924e99 (patch) | |
tree | 4a9bb7663dc9993dde4267fd29177cc50610eba5 | |
parent | b2bc73df8676a727babfe3c63bf9e357a48ad2d3 (diff) | |
download | ydb-4fc77b199a24e975406ad15a37d4af5e85924e99.tar.gz |
Make block combine hashed use stream instead of flow (#9979)
25 files changed, 1294 insertions, 846 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index f37baa8343..234c2c6dc9 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -791,7 +791,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, } TTypeAnnotationNode::TListType blockItemTypes; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -817,7 +817,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, } auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } @@ -828,7 +828,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu } TTypeAnnotationNode::TListType blockItemTypes; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -867,7 +867,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } @@ -879,7 +879,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr } TTypeAnnotationNode::TListType blockItemTypes; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } YQL_ENSURE(blockItemTypes.size() > 0); @@ -917,7 +917,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr } // disallow any scalar columns except for streamIndex column - auto itemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); + auto itemTypes = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); for (ui32 i = 0; i + 1 < itemTypes.size(); ++i) { bool isScalar = itemTypes[i]->GetKind() == ETypeAnnotationKind::Scalar; if (isScalar && i != streamIndex) { @@ -929,7 +929,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 12714671b6..a4c7df599e 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { } else { stream = AggList; } - auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false); + + TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false); if (!blocks) { return nullptr; } @@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { if (hashed) { aggWideFlow = Ctx.Builder(Node->Pos()) .Callable("WideFromBlocks") - .Callable(0, "BlockCombineHashed") - .Add(0, blocks) - .Callable(1, "Void") + .Callable(0, "ToFlow") + .Callable(0, "BlockCombineHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Callable(1, "Void") + .Seal() + .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() - .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Seal() .Build(); } else { aggWideFlow = Ctx.Builder(Node->Pos()) - .Callable("BlockCombineAll") - .Add(0, blocks) - .Callable(1, "Void") + .Callable("ToFlow") + .Callable(0, "BlockCombineAll") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Callable(1, "Void") + .Seal() + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Build(); } @@ -2891,10 +2900,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { TExprNode::TPtr aggBlocks; if (!isMany) { aggBlocks = Ctx.Builder(Node->Pos()) - .Callable("BlockMergeFinalizeHashed") - .Add(0, blocks) - .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Callable("ToFlow") + .Callable(0, "BlockMergeFinalizeHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Seal() .Seal() .Build(); } else { @@ -2902,12 +2915,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting"); aggBlocks = Ctx.Builder(Node->Pos()) - .Callable("BlockMergeManyFinalizeHashed") - .Add(0, blocks) - .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) - .Atom(3, ToString(streamIdxColumn)) - .Add(4, manyStreamsSetting->TailPtr()) + .Callable("ToFlow") + .Callable(0, "BlockMergeManyFinalizeHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Atom(3, ToString(streamIdxColumn)) + .Add(4, manyStreamsSetting->TailPtr()) + .Seal() .Seal() .Build(); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index eb1ffcba56..a1bef3ae63 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -442,9 +442,20 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) { return GetSparseBitmapPopCount(src, len); } +TArrayRef<TType *const> GetWideComponents(TType* type) { + if (type->IsFlow()) { + const auto outputFlowType = AS_TYPE(TFlowType, type); + return GetWideComponents(outputFlowType); + } + if (type->IsStream()) { + const auto outputStreamType = AS_TYPE(TStreamType, type); + return GetWideComponents(outputStreamType); + } + MKQL_ENSURE(false, "Expect either flow or stream"); +} + size_t CalcMaxBlockLenForOutput(TType* out) { - const auto outputType = AS_TYPE(TFlowType, out); - const auto wideComponents = GetWideComponents(outputType); + const auto wideComponents = GetWideComponents(out); MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column"); size_t maxBlockItemSize = 0; @@ -604,11 +615,99 @@ protected: #endif }; -class TBlockCombineAllWrapper : public TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapper>, + +struct TBlockCombineAllState : public TComputationValue<TBlockCombineAllState> { + NUdf::TUnboxedValue* Pointer_ = nullptr; + bool IsFinished_ = false; + bool HasValues_ = false; + TUnboxedValueVector Values_; + std::vector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_; + std::vector<char> AggStates_; + const std::optional<ui32> FilterColumn_; + const size_t Width_; + + TBlockCombineAllState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const std::vector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx) + : TComputationValue(memInfo) + , Values_(std::max(width, params.size())) + , FilterColumn_(filterColumn) + , Width_(width) + { + Pointer_ = Values_.data(); + + ui32 totalStateSize = 0; + for (const auto& p : params) { + Aggs_.emplace_back(p.Prepared_->Make(ctx)); + MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch"); + totalStateSize += Aggs_.back()->StateSize; + } + + AggStates_.resize(totalStateSize); + char* ptr = AggStates_.data(); + for (const auto& agg : Aggs_) { + agg->InitState(ptr); + ptr += agg->StateSize; + } + } + + void ProcessInput() { + const ui64 batchLength = TArrowBlock::From(Values_[Width_ - 1U]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + if (!batchLength) { + return; + } + + std::optional<ui64> filtered; + if (FilterColumn_) { + const auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum(); + if (filterDatum.is_scalar()) { + if (!filterDatum.scalar_as<arrow::UInt8Scalar>().value) { + return; + } + } else { + const ui64 popCount = GetBitmapPopCount(filterDatum.array()); + if (popCount == 0) { + return; + } + + if (popCount < batchLength) { + filtered = popCount; + } + } + } + + HasValues_ = true; + char* ptr = AggStates_.data(); + for (size_t i = 0; i < Aggs_.size(); ++i) { + Aggs_[i]->AddMany(ptr, Values_.data(), batchLength, filtered); + ptr += Aggs_[i]->StateSize; + } + } + + bool MakeOutput() { + IsFinished_ = true; + if (!HasValues_) + return false; + + char* ptr = AggStates_.data(); + for (size_t i = 0; i < Aggs_.size(); ++i) { + Values_[i] = Aggs_[i]->FinishOne(ptr); + Aggs_[i]->DestroyState(ptr); + ptr += Aggs_[i]->StateSize; + } + return true; + } + + NUdf::TUnboxedValuePod Get(size_t index) const { + return Values_[index]; + } +}; + +class TBlockCombineAllWrapperFromFlow : public TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapperFromFlow>, protected TBlockCombineAllWrapperCodegenBase { -using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapper>; +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapperFromFlow>; + +using TState = TBlockCombineAllState; public: - TBlockCombineAllWrapper(TComputationMutables& mutables, + TBlockCombineAllWrapperFromFlow(TComputationMutables& mutables, IComputationWideFlowNode* flow, std::optional<ui32> filterColumn, size_t width, @@ -655,95 +754,11 @@ public: #ifndef MKQL_DISABLE_CODEGEN ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, AggsParams_.size(), - GetMethodPtr(&TState::Get), GetMethodPtr(&TBlockCombineAllWrapper::MakeState), + GetMethodPtr(&TState::Get), GetMethodPtr(&TBlockCombineAllWrapperFromFlow::MakeState), GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::MakeOutput)); } #endif private: - struct TState : public TComputationValue<TState> { - NUdf::TUnboxedValue* Pointer_ = nullptr; - bool IsFinished_ = false; - bool HasValues_ = false; - TUnboxedValueVector Values_; - std::vector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_; - std::vector<char> AggStates_; - const std::optional<ui32> FilterColumn_; - const size_t Width_; - - TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const std::vector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx) - : TComputationValue(memInfo) - , Values_(std::max(width, params.size())) - , FilterColumn_(filterColumn) - , Width_(width) - { - Pointer_ = Values_.data(); - - ui32 totalStateSize = 0; - for (const auto& p : params) { - Aggs_.emplace_back(p.Prepared_->Make(ctx)); - MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch"); - totalStateSize += Aggs_.back()->StateSize; - } - - AggStates_.resize(totalStateSize); - char* ptr = AggStates_.data(); - for (const auto& agg : Aggs_) { - agg->InitState(ptr); - ptr += agg->StateSize; - } - } - - void ProcessInput() { - const ui64 batchLength = TArrowBlock::From(Values_[Width_ - 1U]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; - if (!batchLength) { - return; - } - - std::optional<ui64> filtered; - if (FilterColumn_) { - const auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum(); - if (filterDatum.is_scalar()) { - if (!filterDatum.scalar_as<arrow::UInt8Scalar>().value) { - return; - } - } else { - const ui64 popCount = GetBitmapPopCount(filterDatum.array()); - if (popCount == 0) { - return; - } - - if (popCount < batchLength) { - filtered = popCount; - } - } - } - - HasValues_ = true; - char* ptr = AggStates_.data(); - for (size_t i = 0; i < Aggs_.size(); ++i) { - Aggs_[i]->AddMany(ptr, Values_.data(), batchLength, filtered); - ptr += Aggs_[i]->StateSize; - } - } - - bool MakeOutput() { - IsFinished_ = true; - if (!HasValues_) - return false; - - char* ptr = AggStates_.data(); - for (size_t i = 0; i < Aggs_.size(); ++i) { - Values_[i] = Aggs_[i]->FinishOne(ptr); - Aggs_[i]->DestroyState(ptr); - ptr += Aggs_[i]->StateSize; - } - return true; - } - - NUdf::TUnboxedValuePod Get(size_t index) const { - return Values_[index]; - } - }; void RegisterDependencies() const final { FlowDependsOn(Flow_); } @@ -773,6 +788,89 @@ private: const size_t WideFieldsIndex_; }; +class TBlockCombineAllWrapperFromStream : public TMutableComputationNode<TBlockCombineAllWrapperFromStream> { +using TBaseComputation = TMutableComputationNode<TBlockCombineAllWrapperFromStream>; + +using TState = TBlockCombineAllState; +public: + TBlockCombineAllWrapperFromStream(TComputationMutables& mutables, + IComputationNode* stream, + std::optional<ui32> filterColumn, + size_t width, + std::vector<TAggParams<IBlockAggregatorCombineAll>>&& aggsParams) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , Stream_(stream) + , FilterColumn_(filterColumn) + , Width_(width) + , AggsParams_(std::move(aggsParams)) + , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width)) + { + MKQL_ENSURE(Width_ > 0, "Missing block length column"); + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + const auto state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx); + return ctx.HolderFactory.Create<TStreamValue>(std::move(state), std::move(Stream_->GetValue(ctx))); + } + +private: + class TStreamValue : public TComputationValue<TStreamValue> { + using TBase = TComputationValue<TStreamValue>; + public: + TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& state, NUdf::TUnboxedValue&& stream) + : TBase(memInfo) + , State_(state) + , Stream_(stream) + { + } + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + TState& state = *static_cast<TState*>(State_.AsBoxed().Get()); + auto* inputFields = state.Values_.data(); + const size_t inputWidth = state.Width_; + + if (state.IsFinished_) + return NUdf::EFetchStatus::Finish; + + while (true) { + switch (Stream_.WideFetch(inputFields, inputWidth)) { + case NUdf::EFetchStatus::Yield: + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Ok: + state.ProcessInput(); + continue; + case NUdf::EFetchStatus::Finish: + break; + } + if (state.MakeOutput()) { + for (size_t i = 0; i < width; ++i) { + output[i] = state.Get(i); + } + return NUdf::EFetchStatus::Ok; + } + return NUdf::EFetchStatus::Finish; + } + } + private: + NUdf::TUnboxedValue State_; + NUdf::TUnboxedValue Stream_; + }; + +private: + void RegisterDependencies() const final { + DependsOn(Stream_); + } + +private: + IComputationNode *const Stream_; + const std::optional<ui32> FilterColumn_; + const size_t Width_; + const std::vector<TAggParams<IBlockAggregatorCombineAll>> AggsParams_; + const size_t WideFieldsIndex_; +}; + template <typename T> T MakeKey(TStringBuf s, ui32 keyLength) { Y_UNUSED(keyLength); @@ -1050,585 +1148,594 @@ protected: }; template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived> -class THashedWrapperBase : public TStatefulWideFlowCodegeneratorNode<TDerived>, - protected THashedWrapperCodegenBase -{ - using TComputationBase = TStatefulWideFlowCodegeneratorNode<TDerived>; +struct THashedWrapperBaseState : public TBlockState { +private: static constexpr bool UseArena = !InlineAggState && std::is_same<TFixedAggState, TStateArena>::value; public: - THashedWrapperBase(TComputationMutables& mutables, - IComputationWideFlowNode* flow, - std::optional<ui32> filterColumn, - size_t width, - const std::vector<TKeyParams>& keys, - size_t maxBlockLen, - ui32 keyLength, - std::vector<TAggParams<TAggregator>>&& aggsParams, - ui32 streamIndex, - std::vector<std::vector<ui32>>&& streams) - : TComputationBase(mutables, flow, EValueRepresentation::Boxed) - , Flow_(flow) + bool WritingOutput_ = false; + bool IsFinished_ = false; + + const std::optional<ui32> FilterColumn_; + const std::vector<TKeyParams> Keys_; + const std::vector<TAggParams<TAggregator>>& AggsParams_; + const ui32 KeyLength_; + const ui32 StreamIndex_; + const std::vector<std::vector<ui32>> Streams_; + const size_t MaxBlockLen_; + const size_t Width_; + const size_t OutputWidth_; + + template<typename TKeyType> + struct THashSettings { + static constexpr bool CacheHash = std::is_same_v<TKeyType, TSSOKey>; + }; + using TDynMapImpl = TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; + using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; + using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; + + ui64 BatchNum_ = 0; + TUnboxedValueVector Values_; + std::vector<std::unique_ptr<TAggregator>> Aggs_; + std::vector<ui32> AggStateOffsets_; + TUnboxedValueVector UnwrappedValues_; + std::vector<std::unique_ptr<IBlockReader>> Readers_; + std::vector<std::unique_ptr<IArrayBuilder>> Builders_; + std::vector<std::unique_ptr<IAggColumnBuilder>> AggBuilders_; + bool HasValues_ = false; + ui32 TotalStateSize_ = 0; + size_t OutputBlockSize_ = 0; + std::unique_ptr<TDynMapImpl> HashMap_; + typename TDynMapImpl::const_iterator HashMapIt_; + std::unique_ptr<TSetImpl> HashSet_; + typename TSetImpl::const_iterator HashSetIt_; + std::unique_ptr<TFixedMapImpl> HashFixedMap_; + typename TFixedMapImpl::const_iterator HashFixedMapIt_; + TPagedArena Arena_; + + THashedWrapperBaseState(TMemoryUsageInfo* memInfo, ui32 keyLength, ui32 streamIndex, size_t width, size_t outputWidth, std::optional<ui32> filterColumn, const std::vector<TAggParams<TAggregator>>& params, + const std::vector<std::vector<ui32>>& streams, const std::vector<TKeyParams>& keys, size_t maxBlockLen, TComputationContext& ctx) + : TBlockState(memInfo, outputWidth) , FilterColumn_(filterColumn) - , Width_(width) - , OutputWidth_(keys.size() + aggsParams.size() + 1) - , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width)) , Keys_(keys) - , MaxBlockLen_(maxBlockLen) - , AggsParams_(std::move(aggsParams)) + , AggsParams_(params) , KeyLength_(keyLength) , StreamIndex_(streamIndex) - , Streams_(std::move(streams)) + , Streams_(streams) + , MaxBlockLen_(maxBlockLen) + , Width_(width) + , OutputWidth_(outputWidth) + , Values_(width) + , UnwrappedValues_(width) + , Readers_(keys.size()) + , Builders_(keys.size()) + , Arena_(TlsAllocState) { - MKQL_ENSURE(Width_ > 0, "Missing block length column"); - if constexpr (UseFilter) { - MKQL_ENSURE(filterColumn, "Missing filter column"); - MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize"); - } else { - MKQL_ENSURE(!filterColumn, "Unexpected filter column"); + Pointer_ = Values_.data(); + for (size_t i = 0; i < Keys_.size(); ++i) { + auto itemType = AS_TYPE(TBlockType, Keys_[i].Type)->GetItemType(); + Readers_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType); + Builders_[i] = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), itemType, ctx.ArrowMemoryPool, MaxBlockLen_, &ctx.Builder->GetPgBuilder()); } - } - - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, - TComputationContext& ctx, - NUdf::TUnboxedValue*const* output) const - { - auto& s = GetState(state, ctx); - if (!s.Count) { - if (s.IsFinished_) - return EFetchResult::Finish; - - while (!s.WritingOutput_) { - const auto fields = ctx.WideFields.data() + WideFieldsIndex_; - s.Values_.assign(s.Values_.size(), NUdf::TUnboxedValuePod()); - switch (Flow_->FetchValues(ctx, fields)) { - case EFetchResult::Yield: - return EFetchResult::Yield; - case EFetchResult::One: - s.ProcessInput(ctx.HolderFactory); - continue; - case EFetchResult::Finish: - break; - } - if (s.Finish()) - break; - else - return EFetchResult::Finish; - } + if constexpr (Many) { + TotalStateSize_ += Streams_.size(); + } - if (!s.FillOutput(ctx.HolderFactory)) - return EFetchResult::Finish; + for (const auto& p : AggsParams_) { + Aggs_.emplace_back(p.Prepared_->Make(ctx)); + MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch"); + AggStateOffsets_.emplace_back(TotalStateSize_); + TotalStateSize_ += Aggs_.back()->StateSize; } - const auto sliceSize = s.Slice(); - for (size_t i = 0; i < OutputWidth_; ++i) { - if (const auto out = output[i]) { - *out = s.Get(sliceSize, ctx.HolderFactory, i); + auto equal = MakeEqual<TKey>(KeyLength_); + auto hasher = MakeHash<TKey>(KeyLength_); + if constexpr (UseSet) { + MKQL_ENSURE(params.empty(), "Only keys are supported"); + HashSet_ = std::make_unique<THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal); + } else { + if (!InlineAggState) { + HashFixedMap_ = std::make_unique<TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal); + } else { + HashMap_ = std::make_unique<TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(TotalStateSize_, hasher, equal); } } - return EFetchResult::One; } -#ifndef MKQL_DISABLE_CODEGEN - ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { - return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, OutputWidth_, - GetMethodPtr(&TState::Get), GetMethodPtr(&THashedWrapperBase::MakeState), - GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::Finish), - GetMethodPtr(&TState::FillOutput), GetMethodPtr(&TState::Slice)); - } -#endif -private: - struct TState : public TBlockState { - bool WritingOutput_ = false; - bool IsFinished_ = false; - - const std::optional<ui32> FilterColumn_; - const std::vector<TKeyParams> Keys_; - const std::vector<TAggParams<TAggregator>>& AggsParams_; - const ui32 KeyLength_; - const ui32 StreamIndex_; - const std::vector<std::vector<ui32>> Streams_; - const size_t MaxBlockLen_; - - template<typename TKeyType> - struct THashSettings { - static constexpr bool CacheHash = std::is_same_v<TKeyType, TSSOKey>; - }; - using TDynMapImpl = TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; - using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; - using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>; - - ui64 BatchNum_ = 0; - TUnboxedValueVector Values_; - std::vector<std::unique_ptr<TAggregator>> Aggs_; - std::vector<ui32> AggStateOffsets_; - TUnboxedValueVector UnwrappedValues_; - std::vector<std::unique_ptr<IBlockReader>> Readers_; - std::vector<std::unique_ptr<IArrayBuilder>> Builders_; - std::vector<std::unique_ptr<IAggColumnBuilder>> AggBuilders_; - bool HasValues_ = false; - ui32 TotalStateSize_ = 0; - size_t OutputBlockSize_ = 0; - std::unique_ptr<TDynMapImpl> HashMap_; - typename TDynMapImpl::const_iterator HashMapIt_; - std::unique_ptr<TSetImpl> HashSet_; - typename TSetImpl::const_iterator HashSetIt_; - std::unique_ptr<TFixedMapImpl> HashFixedMap_; - typename TFixedMapImpl::const_iterator HashFixedMapIt_; - TPagedArena Arena_; - - TState(TMemoryUsageInfo* memInfo, ui32 keyLength, ui32 streamIndex, size_t width, size_t outputWidth, std::optional<ui32> filterColumn, const std::vector<TAggParams<TAggregator>>& params, - const std::vector<std::vector<ui32>>& streams, const std::vector<TKeyParams>& keys, size_t maxBlockLen, TComputationContext& ctx) - : TBlockState(memInfo, outputWidth) - , FilterColumn_(filterColumn) - , Keys_(keys) - , AggsParams_(params) - , KeyLength_(keyLength) - , StreamIndex_(streamIndex) - , Streams_(streams) - , MaxBlockLen_(maxBlockLen) - , Values_(width) - , UnwrappedValues_(width) - , Readers_(keys.size()) - , Builders_(keys.size()) - , Arena_(TlsAllocState) - { - Pointer_ = Values_.data(); - for (size_t i = 0; i < Keys_.size(); ++i) { - auto itemType = AS_TYPE(TBlockType, Keys_[i].Type)->GetItemType(); - Readers_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType); - Builders_[i] = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), itemType, ctx.ArrowMemoryPool, MaxBlockLen_, &ctx.Builder->GetPgBuilder()); - } - - if constexpr (Many) { - TotalStateSize_ += Streams_.size(); - } - for (const auto& p : AggsParams_) { - Aggs_.emplace_back(p.Prepared_->Make(ctx)); - MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch"); - AggStateOffsets_.emplace_back(TotalStateSize_); - TotalStateSize_ += Aggs_.back()->StateSize; - } + void ProcessInput(const THolderFactory& holderFactory) { + ++BatchNum_; + const auto batchLength = TArrowBlock::From(Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + if (!batchLength) { + return; + } - auto equal = MakeEqual<TKey>(KeyLength_); - auto hasher = MakeHash<TKey>(KeyLength_); - if constexpr (UseSet) { - MKQL_ENSURE(params.empty(), "Only keys are supported"); - HashSet_ = std::make_unique<THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal); + const ui8* filterBitmap = nullptr; + if constexpr (UseFilter) { + auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum(); + if (filterDatum.is_scalar()) { + if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) { + return; + } } else { - if (!InlineAggState) { - HashFixedMap_ = std::make_unique<TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal); - } else { - HashMap_ = std::make_unique<TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(TotalStateSize_, hasher, equal); + const auto& arr = filterDatum.array(); + filterBitmap = arr->template GetValues<ui8>(1); + ui64 popCount = GetBitmapPopCount(arr); + if (popCount == 0) { + return; } } } - void ProcessInput(const THolderFactory& holderFactory) { - ++BatchNum_; - const auto batchLength = TArrowBlock::From(Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value; - if (!batchLength) { - return; + const ui32* streamIndexData = nullptr; + TMaybe<ui32> streamIndexScalar; + if constexpr (Many) { + auto streamIndexDatum = TArrowBlock::From(Values_[StreamIndex_]).GetDatum(); + if (streamIndexDatum.is_scalar()) { + streamIndexScalar = streamIndexDatum.template scalar_as<arrow::UInt32Scalar>().value; + } else { + MKQL_ENSURE(streamIndexDatum.is_array(), "Expected array"); + streamIndexData = streamIndexDatum.array()->template GetValues<ui32>(1); } - - const ui8* filterBitmap = nullptr; - if constexpr (UseFilter) { - auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum(); - if (filterDatum.is_scalar()) { - if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) { - return; - } - } else { - const auto& arr = filterDatum.array(); - filterBitmap = arr->template GetValues<ui8>(1); - ui64 popCount = GetBitmapPopCount(arr); - if (popCount == 0) { - return; - } - } + UnwrappedValues_ = Values_; + for (const auto& p : AggsParams_) { + const auto& columnDatum = TArrowBlock::From(UnwrappedValues_[p.Column_]).GetDatum(); + MKQL_ENSURE(columnDatum.is_array(), "Expected array"); + UnwrappedValues_[p.Column_] = holderFactory.CreateArrowBlock(Unwrap(*columnDatum.array(), p.StateType_)); } + } - const ui32* streamIndexData = nullptr; - TMaybe<ui32> streamIndexScalar; - if constexpr (Many) { - auto streamIndexDatum = TArrowBlock::From(Values_[StreamIndex_]).GetDatum(); - if (streamIndexDatum.is_scalar()) { - streamIndexScalar = streamIndexDatum.template scalar_as<arrow::UInt32Scalar>().value; - } else { - MKQL_ENSURE(streamIndexDatum.is_array(), "Expected array"); - streamIndexData = streamIndexDatum.array()->template GetValues<ui32>(1); - } - UnwrappedValues_ = Values_; - for (const auto& p : AggsParams_) { - const auto& columnDatum = TArrowBlock::From(UnwrappedValues_[p.Column_]).GetDatum(); - MKQL_ENSURE(columnDatum.is_array(), "Expected array"); - UnwrappedValues_[p.Column_] = holderFactory.CreateArrowBlock(Unwrap(*columnDatum.array(), p.StateType_)); - } - } + HasValues_ = true; + std::vector<arrow::Datum> keysDatum; + keysDatum.reserve(Keys_.size()); + for (ui32 i = 0; i < Keys_.size(); ++i) { + keysDatum.emplace_back(TArrowBlock::From(Values_[Keys_[i].Index]).GetDatum()); + } - HasValues_ = true; - std::vector<arrow::Datum> keysDatum; - keysDatum.reserve(Keys_.size()); - for (ui32 i = 0; i < Keys_.size(); ++i) { - keysDatum.emplace_back(TArrowBlock::From(Values_[Keys_[i].Index]).GetDatum()); - } + std::array<TOutputBuffer, PrefetchBatchSize> out; + for (ui32 i = 0; i < PrefetchBatchSize; ++i) { + out[i].Resize(sizeof(TKey)); + } - std::array<TOutputBuffer, PrefetchBatchSize> out; - for (ui32 i = 0; i < PrefetchBatchSize; ++i) { - out[i].Resize(sizeof(TKey)); + std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch; + std::array<ui64, PrefetchBatchSize> insertBatchRows; + std::array<char*, PrefetchBatchSize> insertBatchPayloads; + std::array<bool, PrefetchBatchSize> insertBatchIsNew; + ui32 insertBatchLen = 0; + + const auto processInsertBatch = [&]() { + for (ui32 i = 0; i < insertBatchLen; ++i) { + auto& r = insertBatch[i]; + TStringBuf str = out[i].Finish(); + TKey key = MakeKey<TKey>(str, KeyLength_); + r.ConstructKey(key); } - std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch; - std::array<ui64, PrefetchBatchSize> insertBatchRows; - std::array<char*, PrefetchBatchSize> insertBatchPayloads; - std::array<bool, PrefetchBatchSize> insertBatchIsNew; - ui32 insertBatchLen = 0; - - const auto processInsertBatch = [&]() { - for (ui32 i = 0; i < insertBatchLen; ++i) { - auto& r = insertBatch[i]; - TStringBuf str = out[i].Finish(); - TKey key = MakeKey<TKey>(str, KeyLength_); - r.ConstructKey(key); + if constexpr (UseSet) { + HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename THashedWrapperBaseState::TSetImpl::iterator iter, bool isNew) { + Y_UNUSED(index); + if (isNew) { + if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { + MoveKeyToArena(HashSet_->GetKey(iter), Arena_, KeyLength_); + } + } + }); + } else { + using THashTable = std::conditional_t<InlineAggState, typename THashedWrapperBaseState::TDynMapImpl, typename THashedWrapperBaseState::TFixedMapImpl>; + THashTable* hash; + if constexpr (!InlineAggState) { + hash = HashFixedMap_.get(); + } else { + hash = HashMap_.get(); } - if constexpr (UseSet) { - HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename TState::TSetImpl::iterator iter, bool isNew) { - Y_UNUSED(index); - if (isNew) { - if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { - MoveKeyToArena(HashSet_->GetKey(iter), Arena_, KeyLength_); - } + hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) { + if (isNew) { + if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { + MoveKeyToArena(hash->GetKey(iter), Arena_, KeyLength_); } - }); - } else { - using THashTable = std::conditional_t<InlineAggState, typename TState::TDynMapImpl, typename TState::TFixedMapImpl>; - THashTable* hash; - if constexpr (!InlineAggState) { - hash = HashFixedMap_.get(); - } else { - hash = HashMap_.get(); } - hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) { + if constexpr (UseArena) { + // prefetch payloads only + auto payload = hash->GetPayload(iter); + char* ptr; if (isNew) { - if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) { - MoveKeyToArena(hash->GetKey(iter), Arena_, KeyLength_); - } - } - - if constexpr (UseArena) { - // prefetch payloads only - auto payload = hash->GetPayload(iter); - char* ptr; - if (isNew) { - ptr = (char*)Arena_.Alloc(TotalStateSize_); - *(char**)payload = ptr; - } else { - ptr = *(char**)payload; - } - - insertBatchIsNew[index] = isNew; - insertBatchPayloads[index] = ptr; - NYql::PrefetchForWrite(ptr); + ptr = (char*)Arena_.Alloc(TotalStateSize_); + *(char**)payload = ptr; } else { - // process insert - auto payload = (char*)hash->GetPayload(iter); - auto row = insertBatchRows[index]; - ui32 streamIndex = 0; - if constexpr (Many) { - streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; - } - - Insert(row, payload, isNew, streamIndex); + ptr = *(char**)payload; } - }); - if constexpr (UseArena) { - for (ui32 i = 0; i < insertBatchLen; ++i) { - auto row = insertBatchRows[i]; - ui32 streamIndex = 0; - if constexpr (Many) { - streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; - } - - bool isNew = insertBatchIsNew[i]; - char* payload = insertBatchPayloads[i]; - Insert(row, payload, isNew, streamIndex); + insertBatchIsNew[index] = isNew; + insertBatchPayloads[index] = ptr; + NYql::PrefetchForWrite(ptr); + } else { + // process insert + auto payload = (char*)hash->GetPayload(iter); + auto row = insertBatchRows[index]; + ui32 streamIndex = 0; + if constexpr (Many) { + streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; } + + Insert(row, payload, isNew, streamIndex); } - } - }; + }); - for (ui64 row = 0; row < batchLength; ++row) { - if constexpr (UseFilter) { - if (filterBitmap && !filterBitmap[row]) { - continue; + if constexpr (UseArena) { + for (ui32 i = 0; i < insertBatchLen; ++i) { + auto row = insertBatchRows[i]; + ui32 streamIndex = 0; + if constexpr (Many) { + streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row]; + } + + bool isNew = insertBatchIsNew[i]; + char* payload = insertBatchPayloads[i]; + Insert(row, payload, isNew, streamIndex); } } + } + }; - // encode key - out[insertBatchLen].Rewind(); - for (ui32 i = 0; i < keysDatum.size(); ++i) { - if (keysDatum[i].is_scalar()) { - // TODO: more efficient code when grouping by scalar - Readers_[i]->SaveScalarItem(*keysDatum[i].scalar(), out[insertBatchLen]); - } else { - Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]); - } + for (ui64 row = 0; row < batchLength; ++row) { + if constexpr (UseFilter) { + if (filterBitmap && !filterBitmap[row]) { + continue; } + } - insertBatchRows[insertBatchLen] = row; - ++insertBatchLen; - if (insertBatchLen == PrefetchBatchSize) { - processInsertBatch(); - insertBatchLen = 0; + // encode key + out[insertBatchLen].Rewind(); + for (ui32 i = 0; i < keysDatum.size(); ++i) { + if (keysDatum[i].is_scalar()) { + // TODO: more efficient code when grouping by scalar + Readers_[i]->SaveScalarItem(*keysDatum[i].scalar(), out[insertBatchLen]); + } else { + Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]); } } - processInsertBatch(); + insertBatchRows[insertBatchLen] = row; + ++insertBatchLen; + if (insertBatchLen == PrefetchBatchSize) { + processInsertBatch(); + insertBatchLen = 0; + } } - bool Finish() { - if (!HasValues_) { - IsFinished_ = true; - return false; - } + processInsertBatch(); + } - WritingOutput_ = true; - OutputBlockSize_ = 0; - PrepareAggBuilders(); + bool Finish() { + if (!HasValues_) { + IsFinished_ = true; + return false; + } - if constexpr (UseSet) { - HashSetIt_ = HashSet_->Begin(); + WritingOutput_ = true; + OutputBlockSize_ = 0; + PrepareAggBuilders(); + + if constexpr (UseSet) { + HashSetIt_ = HashSet_->Begin(); + } else { + if constexpr (!InlineAggState) { + HashFixedMapIt_ = HashFixedMap_->Begin(); } else { - if constexpr (!InlineAggState) { - HashFixedMapIt_ = HashFixedMap_->Begin(); - } else { - HashMapIt_ = HashMap_->Begin(); - } + HashMapIt_ = HashMap_->Begin(); } - return true; } + return true; + } - bool FillOutput(const THolderFactory& holderFactory) { - bool exit = false; - while (WritingOutput_) { - if constexpr (UseSet) { - for (;!exit && HashSetIt_ != HashSet_->End(); HashSet_->Advance(HashSetIt_)) { - if (!HashSet_->IsValid(HashSetIt_)) { - continue; - } - - if (OutputBlockSize_ == MaxBlockLen_) { - Flush(false, holderFactory); - //return EFetchResult::One; - exit = true; - break; - } - - const TKey& key = HashSet_->GetKey(HashSetIt_); - TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); - for (auto& kb : Builders_) { - kb->Add(in); - } - ++OutputBlockSize_; + bool FillOutput(const THolderFactory& holderFactory) { + bool exit = false; + while (WritingOutput_) { + if constexpr (UseSet) { + for (;!exit && HashSetIt_ != HashSet_->End(); HashSet_->Advance(HashSetIt_)) { + if (!HashSet_->IsValid(HashSetIt_)) { + continue; } - break; - } else { - const bool done = InlineAggState ? - Iterate(*HashMap_, HashMapIt_) : - Iterate(*HashFixedMap_, HashFixedMapIt_); - if (done) { + + if (OutputBlockSize_ == MaxBlockLen_) { + Flush(false, holderFactory); + //return EFetchResult::One; + exit = true; break; } - Flush(false, holderFactory); - exit = true; + + const TKey& key = HashSet_->GetKey(HashSetIt_); + TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); + for (auto& kb : Builders_) { + kb->Add(in); + } + ++OutputBlockSize_; + } + break; + } else { + const bool done = InlineAggState ? + Iterate(*HashMap_, HashMapIt_) : + Iterate(*HashFixedMap_, HashFixedMapIt_); + if (done) { break; } + Flush(false, holderFactory); + exit = true; + break; } + } - if (!exit) { - IsFinished_ = true; - WritingOutput_ = false; - if (!OutputBlockSize_) - return false; - Flush(true, holderFactory); - } - - FillArrays(); - return true; + if (!exit) { + IsFinished_ = true; + WritingOutput_ = false; + if (!OutputBlockSize_) + return false; + Flush(true, holderFactory); } - private: - void PrepareAggBuilders() { - if constexpr (!UseSet) { - AggBuilders_.clear(); - AggBuilders_.reserve(Aggs_.size()); - for (const auto& a : Aggs_) { - if constexpr (Finalize) { - AggBuilders_.emplace_back(a->MakeResultBuilder(MaxBlockLen_)); - } else { - AggBuilders_.emplace_back(a->MakeStateBuilder(MaxBlockLen_)); - } + + FillArrays(); + return true; + } +private: + void PrepareAggBuilders() { + if constexpr (!UseSet) { + AggBuilders_.clear(); + AggBuilders_.reserve(Aggs_.size()); + for (const auto& a : Aggs_) { + if constexpr (Finalize) { + AggBuilders_.emplace_back(a->MakeResultBuilder(MaxBlockLen_)); + } else { + AggBuilders_.emplace_back(a->MakeStateBuilder(MaxBlockLen_)); } } } + } - void Flush(bool final, const THolderFactory& holderFactory) { - if (!OutputBlockSize_) { - return; - } + void Flush(bool final, const THolderFactory& holderFactory) { + if (!OutputBlockSize_) { + return; + } - for (size_t i = 0; i < Builders_.size(); ++i) { - Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(final)); - } + for (size_t i = 0; i < Builders_.size(); ++i) { + Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(final)); + } - if constexpr (!UseSet) { - for (size_t i = 0; i < Aggs_.size(); ++i) { - Values[Builders_.size() + i] = AggBuilders_[i]->Build(); - } - if (!final) { - PrepareAggBuilders(); - } + if constexpr (!UseSet) { + for (size_t i = 0; i < Aggs_.size(); ++i) { + Values[Builders_.size() + i] = AggBuilders_[i]->Build(); + } + if (!final) { + PrepareAggBuilders(); } - - Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputBlockSize_))); - OutputBlockSize_ = 0; } - void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex) const { - char* ptr = payload; + Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputBlockSize_))); + OutputBlockSize_ = 0; + } - if (isNew) { - if constexpr (Many) { - static_assert(Finalize); - MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); - memset(ptr, 0, Streams_.size()); - ptr[currentStreamIndex] = 1; + void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex) const { + char* ptr = payload; - for (auto i : Streams_[currentStreamIndex]) { + if (isNew) { + if constexpr (Many) { + static_assert(Finalize); + MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); + memset(ptr, 0, Streams_.size()); + ptr[currentStreamIndex] = 1; - Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); - } - } else { - for (size_t i = 0; i < Aggs_.size(); ++i) { - if constexpr (Finalize) { - Aggs_[i]->LoadState(ptr, BatchNum_, Values_.data(), row); - } else { - Aggs_[i]->InitKey(ptr, BatchNum_, Values_.data(), row); - } + for (auto i : Streams_[currentStreamIndex]) { - ptr += Aggs_[i]->StateSize; - } + Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); } } else { - if constexpr (Many) { - static_assert(Finalize); - MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); - - bool isNewStream = !ptr[currentStreamIndex]; - ptr[currentStreamIndex] = 1; - - for (auto i : Streams_[currentStreamIndex]) { - - if (isNewStream) { - Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); - } else { - Aggs_[i]->UpdateState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); - } + for (size_t i = 0; i < Aggs_.size(); ++i) { + if constexpr (Finalize) { + Aggs_[i]->LoadState(ptr, BatchNum_, Values_.data(), row); + } else { + Aggs_[i]->InitKey(ptr, BatchNum_, Values_.data(), row); } - } else { - for (size_t i = 0; i < Aggs_.size(); ++i) { - if constexpr (Finalize) { - Aggs_[i]->UpdateState(ptr, BatchNum_, Values_.data(), row); - } else { - Aggs_[i]->UpdateKey(ptr, BatchNum_, Values_.data(), row); - } - ptr += Aggs_[i]->StateSize; - } + ptr += Aggs_[i]->StateSize; } } - } + } else { + if constexpr (Many) { + static_assert(Finalize); + MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index"); - template <typename THash> - bool Iterate(THash& hash, typename THash::const_iterator& iter) { - MKQL_ENSURE(WritingOutput_, "Supposed to be called at the end"); - std::array<typename THash::const_iterator, PrefetchBatchSize> iters; - ui32 itersLen = 0; - auto iterateBatch = [&]() { - for (ui32 i = 0; i < itersLen; ++i) { - auto iter = iters[i]; - const TKey& key = hash.GetKey(iter); - auto payload = (char*)hash.GetPayload(iter); - char* ptr; - if constexpr (UseArena) { - ptr = *(char**)payload; + bool isNewStream = !ptr[currentStreamIndex]; + ptr[currentStreamIndex] = 1; + + for (auto i : Streams_[currentStreamIndex]) { + + if (isNewStream) { + Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); } else { - ptr = payload; + Aggs_[i]->UpdateState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row); } - - TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); - for (auto& kb : Builders_) { - kb->Add(in); + } + } else { + for (size_t i = 0; i < Aggs_.size(); ++i) { + if constexpr (Finalize) { + Aggs_[i]->UpdateState(ptr, BatchNum_, Values_.data(), row); + } else { + Aggs_[i]->UpdateKey(ptr, BatchNum_, Values_.data(), row); } - if constexpr (Many) { - for (ui32 i = 0; i < Streams_.size(); ++i) { - MKQL_ENSURE(ptr[i], "Missing partial aggregation state for stream #" << i); - } + ptr += Aggs_[i]->StateSize; + } + } + } + } - ptr += Streams_.size(); - } + template <typename THash> + bool Iterate(THash& hash, typename THash::const_iterator& iter) { + MKQL_ENSURE(WritingOutput_, "Supposed to be called at the end"); + std::array<typename THash::const_iterator, PrefetchBatchSize> iters; + ui32 itersLen = 0; + auto iterateBatch = [&]() { + for (ui32 i = 0; i < itersLen; ++i) { + auto iter = iters[i]; + const TKey& key = hash.GetKey(iter); + auto payload = (char*)hash.GetPayload(iter); + char* ptr; + if constexpr (UseArena) { + ptr = *(char**)payload; + } else { + ptr = payload; + } - for (size_t i = 0; i < Aggs_.size(); ++i) { - AggBuilders_[i]->Add(ptr); - Aggs_[i]->DestroyState(ptr); + TInputBuffer in(GetKeyView<TKey>(key, KeyLength_)); + for (auto& kb : Builders_) { + kb->Add(in); + } - ptr += Aggs_[i]->StateSize; + if constexpr (Many) { + for (ui32 i = 0; i < Streams_.size(); ++i) { + MKQL_ENSURE(ptr[i], "Missing partial aggregation state for stream #" << i); } - } - }; - for (; iter != hash.End(); hash.Advance(iter)) { - if (!hash.IsValid(iter)) { - continue; + ptr += Streams_.size(); } - if (OutputBlockSize_ == MaxBlockLen_) { - iterateBatch(); - return false; - } + for (size_t i = 0; i < Aggs_.size(); ++i) { + AggBuilders_[i]->Add(ptr); + Aggs_[i]->DestroyState(ptr); - if (itersLen == iters.size()) { - iterateBatch(); - itersLen = 0; + ptr += Aggs_[i]->StateSize; } + } + }; - iters[itersLen] = iter; - ++itersLen; - ++OutputBlockSize_; - if constexpr (UseArena) { - auto payload = (char*)hash.GetPayload(iter); - auto ptr = *(char**)payload; - NYql::PrefetchForWrite(ptr); + for (; iter != hash.End(); hash.Advance(iter)) { + if (!hash.IsValid(iter)) { + continue; + } + + if (OutputBlockSize_ == MaxBlockLen_) { + iterateBatch(); + return false; + } + + if (itersLen == iters.size()) { + iterateBatch(); + itersLen = 0; + } + + iters[itersLen] = iter; + ++itersLen; + ++OutputBlockSize_; + if constexpr (UseArena) { + auto payload = (char*)hash.GetPayload(iter); + auto ptr = *(char**)payload; + NYql::PrefetchForWrite(ptr); + } + + if constexpr (std::is_same<TKey, TSSOKey>::value) { + const auto& key = hash.GetKey(iter); + if (!key.IsInplace()) { + NYql::PrefetchForRead(key.AsView().Data()); } + } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) { + const auto& key = hash.GetKey(iter); + NYql::PrefetchForRead(key.Data); + } + } - if constexpr (std::is_same<TKey, TSSOKey>::value) { - const auto& key = hash.GetKey(iter); - if (!key.IsInplace()) { - NYql::PrefetchForRead(key.AsView().Data()); - } - } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) { - const auto& key = hash.GetKey(iter); - NYql::PrefetchForRead(key.Data); + iterateBatch(); + return true; + } +}; + +template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived> +class THashedWrapperBaseFromFlow : public TStatefulWideFlowCodegeneratorNode<TDerived>, + protected THashedWrapperCodegenBase +{ + using TComputationBase = TStatefulWideFlowCodegeneratorNode<TDerived>; + + using TState = THashedWrapperBaseState<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>; +public: + THashedWrapperBaseFromFlow(TComputationMutables& mutables, + IComputationWideFlowNode* flow, + std::optional<ui32> filterColumn, + size_t width, + const std::vector<TKeyParams>& keys, + size_t maxBlockLen, + ui32 keyLength, + std::vector<TAggParams<TAggregator>>&& aggsParams, + ui32 streamIndex, + std::vector<std::vector<ui32>>&& streams) + : TComputationBase(mutables, flow, EValueRepresentation::Boxed) + , Flow_(flow) + , FilterColumn_(filterColumn) + , Width_(width) + , OutputWidth_(keys.size() + aggsParams.size() + 1) + , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width)) + , Keys_(keys) + , MaxBlockLen_(maxBlockLen) + , AggsParams_(std::move(aggsParams)) + , KeyLength_(keyLength) + , StreamIndex_(streamIndex) + , Streams_(std::move(streams)) + { + MKQL_ENSURE(Width_ > 0, "Missing block length column"); + if constexpr (UseFilter) { + MKQL_ENSURE(filterColumn, "Missing filter column"); + MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize"); + } else { + MKQL_ENSURE(!filterColumn, "Unexpected filter column"); + } + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, + TComputationContext& ctx, + NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx); + if (!s.Count) { + if (s.IsFinished_) + return EFetchResult::Finish; + + while (!s.WritingOutput_) { + const auto fields = ctx.WideFields.data() + WideFieldsIndex_; + s.Values_.assign(s.Values_.size(), NUdf::TUnboxedValuePod()); + switch (Flow_->FetchValues(ctx, fields)) { + case EFetchResult::Yield: + return EFetchResult::Yield; + case EFetchResult::One: + s.ProcessInput(ctx.HolderFactory); + continue; + case EFetchResult::Finish: + break; } + + if (s.Finish()) + break; + else + return EFetchResult::Finish; } - iterateBatch(); - return true; + if (!s.FillOutput(ctx.HolderFactory)) + return EFetchResult::Finish; } - }; + + const auto sliceSize = s.Slice(); + for (size_t i = 0; i < OutputWidth_; ++i) { + if (const auto out = output[i]) { + *out = s.Get(sliceSize, ctx.HolderFactory, i); + } + } + return EFetchResult::One; + } +#ifndef MKQL_DISABLE_CODEGEN + ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { + return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, OutputWidth_, + GetMethodPtr(&TState::Get), GetMethodPtr(&THashedWrapperBaseFromFlow::MakeState), + GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::Finish), + GetMethodPtr(&TState::FillOutput), GetMethodPtr(&TState::Slice)); + } +#endif private: void RegisterDependencies() const final { this->FlowDependsOn(Flow_); @@ -1665,11 +1772,136 @@ private: const std::vector<std::vector<ui32>> Streams_; }; + +template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived> +class THashedWrapperBaseFromStream : public TMutableComputationNode<TDerived>, + protected THashedWrapperCodegenBase +{ + using TComputationBase = TMutableComputationNode<TDerived>; + + using TState = THashedWrapperBaseState<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>; +public: + THashedWrapperBaseFromStream(TComputationMutables& mutables, + IComputationNode* stream, + std::optional<ui32> filterColumn, + size_t width, + const std::vector<TKeyParams>& keys, + size_t maxBlockLen, + ui32 keyLength, + std::vector<TAggParams<TAggregator>>&& aggsParams, + ui32 streamIndex, + std::vector<std::vector<ui32>>&& streams) + : TComputationBase(mutables, EValueRepresentation::Boxed) + , Stream_(stream) + , FilterColumn_(filterColumn) + , Width_(width) + , OutputWidth_(keys.size() + aggsParams.size() + 1) + , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width)) + , Keys_(keys) + , MaxBlockLen_(maxBlockLen) + , AggsParams_(std::move(aggsParams)) + , KeyLength_(keyLength) + , StreamIndex_(streamIndex) + , Streams_(std::move(streams)) + { + MKQL_ENSURE(Width_ > 0, "Missing block length column"); + if constexpr (UseFilter) { + MKQL_ENSURE(filterColumn, "Missing filter column"); + MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize"); + } else { + MKQL_ENSURE(!filterColumn, "Unexpected filter column"); + } + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + const auto state = ctx.HolderFactory.Create<TState>(KeyLength_, StreamIndex_, Width_, OutputWidth_, FilterColumn_, AggsParams_, Streams_, Keys_, MaxBlockLen_, ctx); + return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(state), std::move(Stream_->GetValue(ctx))); + } +private: + class TStreamValue : public TComputationValue<TStreamValue> { + using TBase = TComputationValue<TStreamValue>; + public: + TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, + NUdf::TUnboxedValue&& state, NUdf::TUnboxedValue&& stream) + : TBase(memInfo) + , State_(state) + , Stream_(stream) + , HolderFactory_(holderFactory) + { + } + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + TState& state = *static_cast<TState*>(State_.AsBoxed().Get()); + auto* inputFields = state.Values_.data(); + const size_t inputWidth = state.Width_; + const size_t outputWidth = state.OutputWidth_; + MKQL_ENSURE(outputWidth == width, "The given width doesn't equal to the result type size"); + + if (!state.Count) { + if (state.IsFinished_) + return NUdf::EFetchStatus::Finish; + + while (!state.WritingOutput_) { + switch (Stream_.WideFetch(inputFields, inputWidth)) { + case NUdf::EFetchStatus::Yield: + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Ok: + state.ProcessInput(HolderFactory_); + continue; + case NUdf::EFetchStatus::Finish: + break; + } + + if (state.Finish()) + break; + else + return NUdf::EFetchStatus::Finish; + } + + if (!state.FillOutput(HolderFactory_)) + return NUdf::EFetchStatus::Finish; + } + + const auto sliceSize = state.Slice(); + for (size_t i = 0; i < outputWidth; ++i) { + output[i] = state.Get(sliceSize, HolderFactory_, i); + } + return NUdf::EFetchStatus::Ok; + } + private: + NUdf::TUnboxedValue State_; + NUdf::TUnboxedValue Stream_; + const THolderFactory& HolderFactory_; + }; +private: + void RegisterDependencies() const final { + this->DependsOn(Stream_); + } + + IComputationNode *const Stream_; + const std::optional<ui32> FilterColumn_; + const size_t Width_; + const size_t OutputWidth_; + const size_t WideFieldsIndex_; + const std::vector<TKeyParams> Keys_; + const size_t MaxBlockLen_; + const std::vector<TAggParams<TAggregator>> AggsParams_; + const ui32 KeyLength_; + const ui32 StreamIndex_; + const std::vector<std::vector<ui32>> Streams_; +}; + +template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter, typename TInputNode> +class TBlockCombineHashedWrapper {}; + template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter> -class TBlockCombineHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter>> { +class TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode> + : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode>> { public: - using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter>; - using TBase = THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>; + using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode>; + using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>; TBlockCombineHashedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1683,11 +1915,34 @@ public: {} }; +template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter> +class TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode> + : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode>> { +public: + using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode>; + using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>; + + TBlockCombineHashedWrapper(TComputationMutables& mutables, + IComputationNode* stream, + std::optional<ui32> filterColumn, + size_t width, + const std::vector<TKeyParams>& keys, + size_t maxBlockLen, + ui32 keyLength, + std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) + : TBase(mutables, stream, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams), 0, {}) + {} +}; + +template <typename TKey, typename TFixedAggState, bool UseSet, typename TInputNode> +class TBlockMergeFinalizeHashedWrapper {}; + template <typename TKey, typename TFixedAggState, bool UseSet> -class TBlockMergeFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet>> { +class TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode> + : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode>> { public: - using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet>; - using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>; + using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode>; + using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>; TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1700,11 +1955,33 @@ public: {} }; +template <typename TKey, typename TFixedAggState, bool UseSet> +class TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode> + : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode>> { +public: + using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode>; + using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>; + + TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables, + IComputationNode* stream, + size_t width, + const std::vector<TKeyParams>& keys, + size_t maxBlockLen, + ui32 keyLength, + std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) + : TBase(mutables, stream, {}, width, keys, maxBlockLen, keyLength, std::move(aggsParams), 0, {}) + {} +}; + +template <typename TKey, typename TFixedAggState, typename TInputNode> +class TBlockMergeManyFinalizeHashedWrapper {}; + template <typename TKey, typename TFixedAggState> -class TBlockMergeManyFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState>> { +class TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode> + : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode>> { public: - using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState>; - using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>; + using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode>; + using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>; TBlockMergeManyFinalizeHashedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, @@ -1718,6 +1995,25 @@ public: {} }; +template <typename TKey, typename TFixedAggState> +class TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode> + : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode>> { +public: + using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode>; + using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>; + + TBlockMergeManyFinalizeHashedWrapper(TComputationMutables& mutables, + IComputationNode* stream, + size_t width, + const std::vector<TKeyParams>& keys, + size_t maxBlockLen, + ui32 keyLength, + std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams, + ui32 streamIndex, std::vector<std::vector<ui32>>&& streams) + : TBase(mutables, stream, {}, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)) + {} +}; + template <typename TAggregator> std::unique_ptr<IPreparedBlockAggregator<TAggregator>> PrepareBlockAggregator(const IBlockAggregatorFactory& factory, TTupleType* tupleType, @@ -1824,117 +2120,117 @@ ui32 FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional< return totalStateSize; } -template <bool UseSet, bool UseFilter, typename TKey> +template <bool UseSet, bool UseFilter, typename TKey, typename TInputNode> IComputationNode* MakeBlockCombineHashedWrapper( ui32 keyLength, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, std::optional<ui32> filterColumn, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) { if (totalStateSize <= sizeof(TState8)) { - return new TBlockCombineHashedWrapper<TKey, TState8, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockCombineHashedWrapper<TKey, TState8, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } if (totalStateSize <= sizeof(TState16)) { - return new TBlockCombineHashedWrapper<TKey, TState16, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockCombineHashedWrapper<TKey, TState16, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } - return new TBlockCombineHashedWrapper<TKey, TStateArena, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockCombineHashedWrapper<TKey, TStateArena, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } -template <bool UseSet, bool UseFilter> +template <bool UseSet, bool UseFilter, typename TInputNode> IComputationNode* MakeBlockCombineHashedWrapper( TMaybe<ui32> totalKeysSize, bool isFixed, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, std::optional<ui32> filterColumn, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) { if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { - return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { - return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) { - return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TKey16>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TKey16>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && isFixed) { - return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); } - return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TSSOKey>(Max<ui32>(), totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TSSOKey>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams)); } -template <typename TKey, bool UseSet> +template <typename TKey, bool UseSet, typename TInputNode> IComputationNode* MakeBlockMergeFinalizeHashedWrapper( ui32 keyLength, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) { if (totalStateSize <= sizeof(TState8)) { - return new TBlockMergeFinalizeHashedWrapper<TKey, TState8, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockMergeFinalizeHashedWrapper<TKey, TState8, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } if (totalStateSize <= sizeof(TState16)) { - return new TBlockMergeFinalizeHashedWrapper<TKey, TState16, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockMergeFinalizeHashedWrapper<TKey, TState16, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } - return new TBlockMergeFinalizeHashedWrapper<TKey, TStateArena, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); + return new TBlockMergeFinalizeHashedWrapper<TKey, TStateArena, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams)); } -template <bool UseSet> +template <bool UseSet, typename TInputNode> IComputationNode* MakeBlockMergeFinalizeHashedWrapper( TMaybe<ui32> totalKeysSize, bool isFixed, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) { if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { - return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { - return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) { - return MakeBlockMergeFinalizeHashedWrapper<TKey16, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockMergeFinalizeHashedWrapper<TKey16, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams)); } if (totalKeysSize && isFixed) { - return MakeBlockMergeFinalizeHashedWrapper<TExternalFixedSizeKey, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockMergeFinalizeHashedWrapper<TExternalFixedSizeKey, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams)); } - return MakeBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(Max<ui32>(), totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams)); + return MakeBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams)); } -template <typename TKey> +template <typename TKey, typename TInputNode> IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( ui32 keyLength, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, @@ -1943,22 +2239,23 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( std::vector<std::vector<ui32>>&& streams) { if (totalStateSize <= sizeof(TState8)) { - return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState8>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); + return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState8, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); } if (totalStateSize <= sizeof(TState16)) { - return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState16>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); + return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState16, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); } - return new TBlockMergeManyFinalizeHashedWrapper<TKey, TStateArena>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); + return new TBlockMergeManyFinalizeHashedWrapper<TKey, TStateArena, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams)); } +template <typename TInputNode> IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( TMaybe<ui32> totalKeysSize, bool isFixed, ui32 totalStateSize, TComputationMutables& mutables, - IComputationWideFlowNode* flow, + TInputNode* streamOrFlow, size_t width, const std::vector<TKeyParams>& keys, size_t maxBlockLen, @@ -1966,22 +2263,22 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper( ui32 streamIndex, std::vector<std::vector<ui32>>&& streams) { if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) { - return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); } if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) { - return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); } if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) { - return MakeBlockMergeManyFinalizeHashedWrapper<TKey16>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + return MakeBlockMergeManyFinalizeHashedWrapper<TKey16>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); } if (totalKeysSize && isFixed) { - return MakeBlockMergeManyFinalizeHashedWrapper<TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + return MakeBlockMergeManyFinalizeHashedWrapper<TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); } - return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(Max<ui32>(), totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); } void PrepareKeys(const std::vector<TKeyParams>& keys, TMaybe<ui32>& totalKeysSize, bool& isFixed) { @@ -2012,14 +2309,15 @@ void FillAggStreams(TRuntimeNode streamsNode, std::vector<std::vector<ui32>>& st IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + + const bool isStream = callable.GetInput(0).GetStaticType()->IsStream(); + MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream"); + + const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType()); const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); - const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType()); - const auto returnWideComponents = GetWideComponents(returnFlowType); + const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType()); - auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); - MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0); auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); std::optional<ui32> filterColumn; @@ -2030,19 +2328,28 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2)); std::vector<TAggParams<IBlockAggregatorCombineAll>> aggsParams; FillAggParams<IBlockAggregatorCombineAll>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env, false, false, returnWideComponents, 0); - return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); + + if (isStream) { + const auto wideStream = wideFlowOrStream; + return new TBlockCombineAllWrapperFromStream(ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); + } else { + const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + return new TBlockCombineAllWrapperFromFlow(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams)); + } } IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + + const bool isStream = callable.GetInput(0).GetStaticType()->IsStream(); + MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream"); + + const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType()); const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); - const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType()); - const auto returnWideComponents = GetWideComponents(returnFlowType); + const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType()); - auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); - MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0); auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1)); std::optional<ui32> filterColumn; @@ -2066,31 +2373,51 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation PrepareKeys(keys, totalKeysSize, isFixed); const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType()); - if (filterColumn) { - if (aggsParams.empty()) { - return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + if (isStream) { + const auto wideStream = wideStreamOrFlow; + if (filterColumn) { + if (aggsParams.empty()) { + return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } else { - return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + if (aggsParams.empty()) { + return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } } else { - if (aggsParams.empty()) { - return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + if (filterColumn) { + if (aggsParams.empty()) { + return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } else { - return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + if (aggsParams.empty()) { + return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } } } IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + + const bool isStream = callable.GetInput(0).GetStaticType()->IsStream(); + MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream"); + + const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType()); const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); - const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType()); - const auto returnWideComponents = GetWideComponents(returnFlowType); + const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType()); - auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); - MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0); auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1)); std::vector<TKeyParams> keys; @@ -2108,23 +2435,35 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu PrepareKeys(keys, totalKeysSize, isFixed); const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType()); - if (aggsParams.empty()) { - return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + if (isStream) { + const auto wideStream = wideStreamOrFlow; + if (aggsParams.empty()) { + return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } else { - return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + if (aggsParams.empty()) { + return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams)); + } } } IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + + const bool isStream = callable.GetInput(0).GetStaticType()->IsStream(); + MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream"); + + const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType()); const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); - const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType()); - const auto returnWideComponents = GetWideComponents(returnFlowType); + const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType()); - const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); - MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0); auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1)); std::vector<TKeyParams> keys; @@ -2147,12 +2486,25 @@ IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TC totalStateSize += streams.size(); const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType()); - if (aggsParams.empty()) { - return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), - keys, maxBlockLen, std::move(aggsParams)); + if (isStream){ + const auto wideStream = wideStreamOrFlow; + if (aggsParams.empty()) { + return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), + keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), + keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + } } else { - return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), - keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + if (aggsParams.empty()) { + return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), + keys, maxBlockLen, std::move(aggsParams)); + } else { + return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), + keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams)); + } } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 3206c5e47a..00a7f18463 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5730,14 +5730,15 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn, - const TArrayRef<const TAggInfo>& aggs, TType* returnType) { - if constexpr (RuntimeVersion < 31U) { - THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; - } +TRuntimeNode TProgramBuilder::BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { @@ -5759,14 +5760,32 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<u return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, +TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockCombineAll(__func__, ToFlow(stream), filterColumn, aggs, flowReturnType)); + } else { + return BuildBlockCombineAll(__func__, stream, filterColumn, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); + if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { @@ -5794,14 +5813,31 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, +TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockCombineHashed(__func__, ToFlow(stream), filterColumn, keys, aggs, flowReturnType)); + } else { + return BuildBlockCombineHashed(__func__, stream, filterColumn, keys, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); TVector<TRuntimeNode> keyNodes; for (const auto& key : keys) { @@ -5824,14 +5860,31 @@ TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, - const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { +TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockMergeFinalizeHashed(__func__, ToFlow(stream), keys, aggs, flowReturnType)); + } else { + return BuildBlockMergeFinalizeHashed(__func__, stream, keys, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); TVector<TRuntimeNode> keyNodes; for (const auto& key : keys) { @@ -5866,6 +5919,23 @@ TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, co return TRuntimeNode(builder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { + if constexpr (RuntimeVersion < 31U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockMergeManyFinalizeHashed(__func__, ToFlow(stream), keys, aggs, streamIndex, streams, flowReturnType)); + } else { + return BuildBlockMergeManyFinalizeHashed(__func__, stream, keys, aggs, streamIndex, streams, returnType); + } +} + TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler) { if constexpr (RuntimeVersion < 39U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 58fdc4c272..74cf9a80ea 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -759,6 +759,15 @@ protected: private: TRuntimeNode BuildWideFilter(const std::string_view& callableName, TRuntimeNode flow, const TNarrowLambda& handler); + TRuntimeNode BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<const TAggInfo>& aggs, TType* returnType); + TRuntimeNode BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType); + TRuntimeNode BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType); + TRuntimeNode BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType); + TRuntimeNode DictItems(TRuntimeNode dict, EDictItems mode); TRuntimeNode If(TRuntimeNode condition, TRuntimeNode thenBranch, TRuntimeNode elseBranch, TType* resultType); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 4d416ac708..bfd26216ab 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 51U +#define MKQL_RUNTIME_VERSION 52U #endif // History: diff --git a/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json index 895fd69dc1..2a936aacb1 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json @@ -486,9 +486,9 @@ ], "test.test[blocks-combine_all_pg_filter--Peephole]": [ { - "checksum": "b82fb484ea512752ac0bd055f2881bad", - "size": 5463, - "uri": "https://{canondata_backend}/1689644/c571b164c0329219eb668c8d9e33807153d99a05/resource.tar.gz#test.test_blocks-combine_all_pg_filter--Peephole_/opt.yql" + "checksum": "f9644e5d1aa2916e452370ff93594ba1", + "size": 5584, + "uri": "https://{canondata_backend}/1942100/eca836514e0d22543696dadda9c6b5fd0411ec2c/resource.tar.gz#test.test_blocks-combine_all_pg_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_pg_filter--Plan]": [ @@ -598,9 +598,9 @@ ], "test.test[blocks-minmax_strings--Peephole]": [ { - "checksum": "e24251cf43d6e021c673a59a5d975701", - "size": 7778, - "uri": "https://{canondata_backend}/1775319/332ffa137abc3e824ae9020e8b37875c568e01e9/resource.tar.gz#test.test_blocks-minmax_strings--Peephole_/opt.yql" + "checksum": "585085c48335fca5229be389e233cf6b", + "size": 8001, + "uri": "https://{canondata_backend}/1942100/eca836514e0d22543696dadda9c6b5fd0411ec2c/resource.tar.gz#test.test_blocks-minmax_strings--Peephole_/opt.yql" } ], "test.test[blocks-minmax_strings--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json index a91f6c216a..abc4bfb0b5 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json @@ -643,9 +643,9 @@ ], "test.test[blocks-combine_hashed_min--Peephole]": [ { - "checksum": "6a4ee50207cc13b68ec0b901b5c90bbc", - "size": 3482, - "uri": "https://{canondata_backend}/1889210/5d1223c56711b5ccfa2ce8980fabbe18f3a04d58/resource.tar.gz#test.test_blocks-combine_hashed_min--Peephole_/opt.yql" + "checksum": "fe48deb06089372a1be7b0f21a39fafa", + "size": 3530, + "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_min--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_min--Plan]": [ @@ -671,9 +671,9 @@ ], "test.test[blocks-combine_hashed_minmax_double--Peephole]": [ { - "checksum": "4eed869d74a2e6927965254eaabfaef9", - "size": 2858, - "uri": "https://{canondata_backend}/1889210/5d1223c56711b5ccfa2ce8980fabbe18f3a04d58/resource.tar.gz#test.test_blocks-combine_hashed_minmax_double--Peephole_/opt.yql" + "checksum": "ed48b0c0f01f1031a11943749017abb4", + "size": 2862, + "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_minmax_double--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_minmax_double--Plan]": [ @@ -699,9 +699,9 @@ ], "test.test[blocks-combine_hashed_set--Peephole]": [ { - "checksum": "c8527dced2050b347b8abb2b106e392e", - "size": 2248, - "uri": "https://{canondata_backend}/1817427/bbd43b1ed3e4b4e4804e9367a38fc8672c2b46db/resource.tar.gz#test.test_blocks-combine_hashed_set--Peephole_/opt.yql" + "checksum": "65b13975fe3b35dcf2a50f4c796a12c9", + "size": 2268, + "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_set--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_set--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json index 6523279b72..b22d08a839 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json @@ -460,9 +460,9 @@ ], "test.test[blocks-combine_all_avg_filter--Peephole]": [ { - "checksum": "67e87d544bf1db7426409705ac43f5d2", - "size": 2221, - "uri": "https://{canondata_backend}/1689644/e953ec7ef3d2122dd1da61bd4dc7f631010ae6c6/resource.tar.gz#test.test_blocks-combine_all_avg_filter--Peephole_/opt.yql" + "checksum": "839af3ed70bf69f8c9ac1d08a622b053", + "size": 2264, + "uri": "https://{canondata_backend}/1936273/60c8060084c7fca13bcbdeeafd7bb7009b935816/resource.tar.gz#test.test_blocks-combine_all_avg_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_avg_filter--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json index 934286b6c7..eded04e75f 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json @@ -417,9 +417,9 @@ ], "test.test[blocks-combine_all_decimal--Peephole]": [ { - "checksum": "f4f26b88e3c27f7c7b7e9a1dbdd2297d", - "size": 5371, - "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_decimal--Peephole_/opt.yql" + "checksum": "f1af6f4b597cfc3f02016363538384b6", + "size": 5391, + "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_decimal--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_decimal--Plan]": [ @@ -445,9 +445,9 @@ ], "test.test[blocks-combine_all_some_filter--Peephole]": [ { - "checksum": "04b9f3cbee17a2e2af320d0764578a2e", - "size": 2020, - "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_some_filter--Peephole_/opt.yql" + "checksum": "12353b122e8698f5979ab7ac48edbf28", + "size": 2063, + "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_some_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_some_filter--Plan]": [ @@ -473,9 +473,9 @@ ], "test.test[blocks-combine_all_sum--Peephole]": [ { - "checksum": "2c97860ba346328f8a92d745c7ca45fa", - "size": 4165, - "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_sum--Peephole_/opt.yql" + "checksum": "1332006a572a466ce7cc65cb9f79d3d4", + "size": 4185, + "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_sum--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_sum--Plan]": [ @@ -501,9 +501,9 @@ ], "test.test[blocks-distinct_opt_state_keys--Peephole]": [ { - "checksum": "19872b5003fd570975efecbb34cb27a7", - "size": 8532, - "uri": "https://{canondata_backend}/1900335/48c1045110f93be7271adb6caf109f9f9e32f3bc/resource.tar.gz#test.test_blocks-distinct_opt_state_keys--Peephole_/opt.yql" + "checksum": "2019c89c68ce2f33c11bafe6a65b4f43", + "size": 8686, + "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-distinct_opt_state_keys--Peephole_/opt.yql" } ], "test.test[blocks-distinct_opt_state_keys--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json index c121ee495e..7db6be5a4f 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json @@ -472,9 +472,9 @@ ], "test.test[blocks-combine_all_max--Peephole]": [ { - "checksum": "311b380d0cf9a03f17b5be896eb75d52", - "size": 6265, - "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-combine_all_max--Peephole_/opt.yql" + "checksum": "578d8413e4f0938c5c51ddbd50c7ffde", + "size": 6267, + "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-combine_all_max--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_max--Plan]": [ @@ -500,9 +500,9 @@ ], "test.test[blocks-combine_all_minmax_nested--Peephole]": [ { - "checksum": "a0294d39683e7ad6fa80cfeb34bbf0a0", - "size": 3407, - "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-combine_all_minmax_nested--Peephole_/opt.yql" + "checksum": "1d3e6826662dae4130959c4b1e4d197c", + "size": 3427, + "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-combine_all_minmax_nested--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_minmax_nested--Plan]": [ @@ -528,9 +528,9 @@ ], "test.test[blocks-combine_hashed_sum--Peephole]": [ { - "checksum": "b9e75e3fb2350f09dbbcd3a17fa55617", - "size": 3542, - "uri": "https://{canondata_backend}/1942671/7651701708982db1bc7660dcd6a1d063fc197122/resource.tar.gz#test.test_blocks-combine_hashed_sum--Peephole_/opt.yql" + "checksum": "ece135ac007bb7b19234bbf85e0aee45", + "size": 3590, + "uri": "https://{canondata_backend}/1920236/0b8929411f995ee81d269f8519a2b62822b5b0f7/resource.tar.gz#test.test_blocks-combine_hashed_sum--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_sum--Plan]": [ @@ -724,9 +724,9 @@ ], "test.test[blocks-lazy_nonstrict_with_scalar_ctx--Peephole]": [ { - "checksum": "5f260ad4f10c1e79d9bec7361e08d388", - "size": 3760, - "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-lazy_nonstrict_with_scalar_ctx--Peephole_/opt.yql" + "checksum": "526263536048e247185473b3e7e22a71", + "size": 3708, + "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-lazy_nonstrict_with_scalar_ctx--Peephole_/opt.yql" } ], "test.test[blocks-lazy_nonstrict_with_scalar_ctx--Plan]": [ @@ -752,9 +752,9 @@ ], "test.test[blocks-minmax_strings_filter--Peephole]": [ { - "checksum": "96ad208ff8de572b236d23a250c26c0e", - "size": 3925, - "uri": "https://{canondata_backend}/1942671/7651701708982db1bc7660dcd6a1d063fc197122/resource.tar.gz#test.test_blocks-minmax_strings_filter--Peephole_/opt.yql" + "checksum": "f0048f669753144b52457204bff26d18", + "size": 4056, + "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-minmax_strings_filter--Peephole_/opt.yql" } ], "test.test[blocks-minmax_strings_filter--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json index 2ab2c85970..85d9a8dcb8 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json @@ -484,9 +484,9 @@ ], "test.test[blocks-combine_hashed_pg--Peephole]": [ { - "checksum": "44f4557f8602e6367b7cff827ac89234", - "size": 6379, - "uri": "https://{canondata_backend}/1880306/c85931dc2ddfc9372ac5287e031731a861495fb0/resource.tar.gz#test.test_blocks-combine_hashed_pg--Peephole_/opt.yql" + "checksum": "bdb9351eb3f869d1e2e3bd47329de921", + "size": 6507, + "uri": "https://{canondata_backend}/1775319/1d2e8f1b903194b7f97d4c3a5928f81379354efa/resource.tar.gz#test.test_blocks-combine_hashed_pg--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_pg--Plan]": [ @@ -512,9 +512,9 @@ ], "test.test[blocks-distinct_opt_state_all--Peephole]": [ { - "checksum": "615baf31dbbe6dd549fee114e9f17fe5", - "size": 8713, - "uri": "https://{canondata_backend}/995452/76963fbe19e9e3c21b622412a3ae8fe5473f46ef/resource.tar.gz#test.test_blocks-distinct_opt_state_all--Peephole_/opt.yql" + "checksum": "1333bede710777316d4570510fd4ea7f", + "size": 8841, + "uri": "https://{canondata_backend}/1936273/1342bc207252f0d51d2a5d59cf94c96ae9330665/resource.tar.gz#test.test_blocks-distinct_opt_state_all--Peephole_/opt.yql" } ], "test.test[blocks-distinct_opt_state_all--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json index 9a45ed2ced..943e2a3074 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json @@ -574,9 +574,9 @@ ], "test.test[blocks-combine_all_max_filter_opt--Peephole]": [ { - "checksum": "ce72dc5aaf5dbc78a6afb13ac9ae87da", - "size": 2119, - "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-combine_all_max_filter_opt--Peephole_/opt.yql" + "checksum": "eba47c259d4e240f3d15ddd779edc6e5", + "size": 2162, + "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-combine_all_max_filter_opt--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_max_filter_opt--Plan]": [ @@ -602,9 +602,9 @@ ], "test.test[blocks-combine_all_min--Peephole]": [ { - "checksum": "57cce58a2b91b32ee543632601fbc42d", - "size": 6265, - "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-combine_all_min--Peephole_/opt.yql" + "checksum": "d2ccd2cda420b126acf214dd58008924", + "size": 6267, + "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-combine_all_min--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_min--Plan]": [ @@ -630,9 +630,9 @@ ], "test.test[blocks-combine_hashed_count--Peephole]": [ { - "checksum": "97ac986b17396f7162260355d37e2f2b", - "size": 3630, - "uri": "https://{canondata_backend}/1775319/dffc024d6db08a14065eefff7616512e9ec14ce3/resource.tar.gz#test.test_blocks-combine_hashed_count--Peephole_/opt.yql" + "checksum": "5fd4f1661e889ec3e4f8cce021d028e3", + "size": 3678, + "uri": "https://{canondata_backend}/1881367/4c7aa0ee09ad13a7c4c8522afaebb0a34e8a70bf/resource.tar.gz#test.test_blocks-combine_hashed_count--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_count--Plan]": [ @@ -770,9 +770,9 @@ ], "test.test[blocks-minmax_tuple--Peephole]": [ { - "checksum": "88907c5a0dc7d235be650136e083477c", - "size": 5500, - "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-minmax_tuple--Peephole_/opt.yql" + "checksum": "34fdb2237dc2a52901a3279048cb493d", + "size": 5540, + "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-minmax_tuple--Peephole_/opt.yql" } ], "test.test[blocks-minmax_tuple--Plan]": [ @@ -854,9 +854,9 @@ ], "test.test[blocks-struct_type--Peephole]": [ { - "checksum": "1e4eb1415a14aaa58a5201e1dff2698b", - "size": 3095, - "uri": "https://{canondata_backend}/1775319/dffc024d6db08a14065eefff7616512e9ec14ce3/resource.tar.gz#test.test_blocks-struct_type--Peephole_/opt.yql" + "checksum": "f89c5ad6de169db5bceb2280b2b45c19", + "size": 3099, + "uri": "https://{canondata_backend}/1881367/4c7aa0ee09ad13a7c4c8522afaebb0a34e8a70bf/resource.tar.gz#test.test_blocks-struct_type--Peephole_/opt.yql" } ], "test.test[blocks-struct_type--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json index 4179ae4702..e87e32930d 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json @@ -495,9 +495,9 @@ ], "test.test[blocks-combine_all_pg--Peephole]": [ { - "checksum": "51b056d9dadc4c2d59d4d5e57572682a", - "size": 7688, - "uri": "https://{canondata_backend}/1689644/b05960229650fc70467add9c3d7638a8f3cdf3e1/resource.tar.gz#test.test_blocks-combine_all_pg--Peephole_/opt.yql" + "checksum": "4ddca44c25daaec6456186fbb157f488", + "size": 7776, + "uri": "https://{canondata_backend}/1916746/e62dc8205f86272010640f8fa6ec2e825d624df1/resource.tar.gz#test.test_blocks-combine_all_pg--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_pg--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json index 2ac5013ae3..fdabf4e5f3 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json @@ -543,9 +543,9 @@ ], "test.test[blocks-combine_all_count_filter_opt--Peephole]": [ { - "checksum": "d05f14700984d99846058bdf6739bad9", - "size": 2233, - "uri": "https://{canondata_backend}/1903885/ffc476321aa22def0fffeeb1a5d7ff900dfce9d6/resource.tar.gz#test.test_blocks-combine_all_count_filter_opt--Peephole_/opt.yql" + "checksum": "623683acd53d94b06b7a19bdb11b4ecf", + "size": 2276, + "uri": "https://{canondata_backend}/1937001/3b94c4bb2028a21752a5dee6baf1a850ce51d480/resource.tar.gz#test.test_blocks-combine_all_count_filter_opt--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_count_filter_opt--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json index 519cb58bbc..52a25855b1 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json @@ -456,9 +456,9 @@ ], "test.test[blocks-combine_all_min_filter--Peephole]": [ { - "checksum": "1c63e924b4b71fb5db01c98dc02d38d8", - "size": 2052, - "uri": "https://{canondata_backend}/1903885/631311924f7bd550e8916c87c0bf4ae694df8ae6/resource.tar.gz#test.test_blocks-combine_all_min_filter--Peephole_/opt.yql" + "checksum": "4a8fc10717081d3a01a44dbc27597c1f", + "size": 2095, + "uri": "https://{canondata_backend}/1775319/8b72d711520d5cd84e67e6541ace66cd7067ac53/resource.tar.gz#test.test_blocks-combine_all_min_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_min_filter--Plan]": [ @@ -512,9 +512,9 @@ ], "test.test[blocks-date_group_by--Peephole]": [ { - "checksum": "54e328a37fe9c9cc95f84acce454d9af", - "size": 4667, - "uri": "https://{canondata_backend}/1942100/977b836f807d2b39508e2ee0606842a99261a1da/resource.tar.gz#test.test_blocks-date_group_by--Peephole_/opt.yql" + "checksum": "c44c3b19c19cb7bec1fc9a11abdf89b8", + "size": 4673, + "uri": "https://{canondata_backend}/1775319/8b72d711520d5cd84e67e6541ace66cd7067ac53/resource.tar.gz#test.test_blocks-date_group_by--Peephole_/opt.yql" } ], "test.test[blocks-date_group_by--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json index aecc50c866..4358fdb451 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json @@ -584,9 +584,9 @@ ], "test.test[blocks-combine_hashed_max--Peephole]": [ { - "checksum": "072de32f237c0a659fcb217825d6fe8d", - "size": 3482, - "uri": "https://{canondata_backend}/1942173/d7b51a8943054cfea98b1e42450768694da5000b/resource.tar.gz#test.test_blocks-combine_hashed_max--Peephole_/opt.yql" + "checksum": "1c4cc1601f1f61b764e391b851324999", + "size": 3530, + "uri": "https://{canondata_backend}/1946324/820e3db848dce72eac40665b9ca64552c869136b/resource.tar.gz#test.test_blocks-combine_hashed_max--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_max--Plan]": [ @@ -612,9 +612,9 @@ ], "test.test[blocks-combine_hashed_sum_many_keys--Peephole]": [ { - "checksum": "dce58d7114cdf691cd277a82b722aaef", - "size": 11971, - "uri": "https://{canondata_backend}/1775319/049530c2ab88f8664aade59a42710a0f0aec002c/resource.tar.gz#test.test_blocks-combine_hashed_sum_many_keys--Peephole_/opt.yql" + "checksum": "616ca4f49125448895b333f0c77aa4bb", + "size": 12071, + "uri": "https://{canondata_backend}/1946324/820e3db848dce72eac40665b9ca64552c869136b/resource.tar.gz#test.test_blocks-combine_hashed_sum_many_keys--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_sum_many_keys--Plan]": [ @@ -640,9 +640,9 @@ ], "test.test[blocks-distinct_mixed_keys--Peephole]": [ { - "checksum": "16b9a0bc731ecbefb8d4274854782711", - "size": 7436, - "uri": "https://{canondata_backend}/1775319/049530c2ab88f8664aade59a42710a0f0aec002c/resource.tar.gz#test.test_blocks-distinct_mixed_keys--Peephole_/opt.yql" + "checksum": "76517f43bab95ee0125ec1b4ccf3a55b", + "size": 7664, + "uri": "https://{canondata_backend}/1937001/e715663cfc04c70f73b3049843d47c204d5f6c08/resource.tar.gz#test.test_blocks-distinct_mixed_keys--Peephole_/opt.yql" } ], "test.test[blocks-distinct_mixed_keys--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json index 127ec082c8..0a108275f3 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json @@ -554,9 +554,9 @@ ], "test.test[blocks-combine_all_minmax_double--Peephole]": [ { - "checksum": "b2c19f7c87d23fc16be804df86d23185", - "size": 2017, - "uri": "https://{canondata_backend}/995452/d8c0a0cd1e7f0cabf7ce37e867037d3ed9ceb5b4/resource.tar.gz#test.test_blocks-combine_all_minmax_double--Peephole_/opt.yql" + "checksum": "b2b4a9eeb219a440e6ab3de3861533c0", + "size": 2037, + "uri": "https://{canondata_backend}/1936273/085dd9582c7a504381fe179e61eafc7aa1a2ac09/resource.tar.gz#test.test_blocks-combine_all_minmax_double--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_minmax_double--Plan]": [ @@ -722,9 +722,9 @@ ], "test.test[blocks-string_as_agg_key--Peephole]": [ { - "checksum": "5923fbb79cabc45e72d3e1b1ee3a9ecf", - "size": 4217, - "uri": "https://{canondata_backend}/1936947/cf80e3ac9f8ab72a1a9ec43a693fa2afe6eab064/resource.tar.gz#test.test_blocks-string_as_agg_key--Peephole_/opt.yql" + "checksum": "e54b6a68d1a685a982fc3093a411d4c2", + "size": 4257, + "uri": "https://{canondata_backend}/1942525/f1dbd68be895d5bf52c9eba32b1cb5bb032d2c48/resource.tar.gz#test.test_blocks-string_as_agg_key--Peephole_/opt.yql" } ], "test.test[blocks-string_as_agg_key--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json index 9f92b7a883..94e4b44c6f 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json @@ -385,9 +385,9 @@ ], "test.test[blocks-combine_all_sum_filter_opt--Peephole]": [ { - "checksum": "ff2c95963b13fa52699ffa7c693a6a58", - "size": 2119, - "uri": "https://{canondata_backend}/1689644/ed8a7467fa50846051bff00d767534d539df7443/resource.tar.gz#test.test_blocks-combine_all_sum_filter_opt--Peephole_/opt.yql" + "checksum": "12c44da956ba25f6ecdcb9cb6a59c970", + "size": 2162, + "uri": "https://{canondata_backend}/1925821/5430ca5ee93f79b17db6a6f6b6c531354175f340/resource.tar.gz#test.test_blocks-combine_all_sum_filter_opt--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_sum_filter_opt--Plan]": [ @@ -413,9 +413,9 @@ ], "test.test[blocks-distinct_pure_all--Peephole]": [ { - "checksum": "054591522c53f5abcbc928609cdaf8f1", - "size": 3185, - "uri": "https://{canondata_backend}/1689644/ed8a7467fa50846051bff00d767534d539df7443/resource.tar.gz#test.test_blocks-distinct_pure_all--Peephole_/opt.yql" + "checksum": "6ab2263b47ec33e1dac75559b453cbc2", + "size": 3287, + "uri": "https://{canondata_backend}/1925821/5430ca5ee93f79b17db6a6f6b6c531354175f340/resource.tar.gz#test.test_blocks-distinct_pure_all--Peephole_/opt.yql" } ], "test.test[blocks-distinct_pure_all--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json index 51452231a2..7b9e8dfcb5 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json @@ -546,9 +546,9 @@ ], "test.test[blocks-combine_all_min_filter_opt--Peephole]": [ { - "checksum": "ff2c95963b13fa52699ffa7c693a6a58", - "size": 2119, - "uri": "https://{canondata_backend}/1936273/8bc4af96bbb1500304fd1be8e2559bcbead7eb36/resource.tar.gz#test.test_blocks-combine_all_min_filter_opt--Peephole_/opt.yql" + "checksum": "12c44da956ba25f6ecdcb9cb6a59c970", + "size": 2162, + "uri": "https://{canondata_backend}/1937001/4c624b84b0fd73da233705cdf7f661499ce8ebcf/resource.tar.gz#test.test_blocks-combine_all_min_filter_opt--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_min_filter_opt--Plan]": [ @@ -574,9 +574,9 @@ ], "test.test[blocks-decimal_avg--Peephole]": [ { - "checksum": "cdc8a4d00988408c92640297fa47c45a", - "size": 3137, - "uri": "https://{canondata_backend}/1931696/1cb947d289c4d65b45c3fba919aa59c8a7d02c7a/resource.tar.gz#test.test_blocks-decimal_avg--Peephole_/opt.yql" + "checksum": "3d5fcce74de740687d3dc82295049e37", + "size": 3157, + "uri": "https://{canondata_backend}/1937001/4c624b84b0fd73da233705cdf7f661499ce8ebcf/resource.tar.gz#test.test_blocks-decimal_avg--Peephole_/opt.yql" } ], "test.test[blocks-decimal_avg--Plan]": [ @@ -602,9 +602,9 @@ ], "test.test[blocks-json_document_type--Peephole]": [ { - "checksum": "a7cd19fa1416b435369188128797921b", - "size": 2595, - "uri": "https://{canondata_backend}/1917492/e3ceae93b691869b63d950444badb29e6ace4842/resource.tar.gz#test.test_blocks-json_document_type--Peephole_/opt.yql" + "checksum": "1536d7c0c5b539ba78f7d43b8ebb5833", + "size": 2599, + "uri": "https://{canondata_backend}/1946324/6bfca3fdf8ec9efde60e3282ae98ccf726043684/resource.tar.gz#test.test_blocks-json_document_type--Peephole_/opt.yql" } ], "test.test[blocks-json_document_type--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json index 582774c7e1..19bd1f6150 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json @@ -512,9 +512,9 @@ ], "test.test[blocks-combine_all_some--Peephole]": [ { - "checksum": "988d82092e311c0cc2288f6c721aa11c", - "size": 6223, - "uri": "https://{canondata_backend}/1936273/d36ce8069af100fc1a5be882969a5505c35574b2/resource.tar.gz#test.test_blocks-combine_all_some--Peephole_/opt.yql" + "checksum": "ec0346c566d620b4bc0d3de5b81e5a9a", + "size": 6225, + "uri": "https://{canondata_backend}/1937001/ef9f5ea919d8150ce90cc4b0dbee0c1cba26fb65/resource.tar.gz#test.test_blocks-combine_all_some--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_some--Plan]": [ @@ -540,9 +540,9 @@ ], "test.test[blocks-combine_hashed_avg--Peephole]": [ { - "checksum": "6cb4c5227d709714d5993d2ed2c77b6b", - "size": 4568, - "uri": "https://{canondata_backend}/1777230/4c11233cb21f28d476a7607d88951afa44d7ddc0/resource.tar.gz#test.test_blocks-combine_hashed_avg--Peephole_/opt.yql" + "checksum": "17a606506695fd0869a72faebd42531d", + "size": 4614, + "uri": "https://{canondata_backend}/1917492/b000db17817f99c1d3a0274a3622303fc2214093/resource.tar.gz#test.test_blocks-combine_hashed_avg--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_avg--Plan]": [ @@ -568,9 +568,9 @@ ], "test.test[blocks-distinct_mixed_all--Peephole]": [ { - "checksum": "3848f2a2650fe368e09a4991bdd3fc0d", - "size": 4241, - "uri": "https://{canondata_backend}/1936273/d36ce8069af100fc1a5be882969a5505c35574b2/resource.tar.gz#test.test_blocks-distinct_mixed_all--Peephole_/opt.yql" + "checksum": "0cee70179fa613f92ac69175e9ca3b83", + "size": 4317, + "uri": "https://{canondata_backend}/1937001/ef9f5ea919d8150ce90cc4b0dbee0c1cba26fb65/resource.tar.gz#test.test_blocks-distinct_mixed_all--Peephole_/opt.yql" } ], "test.test[blocks-distinct_mixed_all--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json index 0b84ff1429..6181c3b5a8 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json @@ -519,9 +519,9 @@ ], "test.test[blocks-combine_all_avg--Peephole]": [ { - "checksum": "72c688d8ba2b59a621b7d928ad361432", - "size": 6542, - "uri": "https://{canondata_backend}/1689644/9c85c8a8fbfb9b467aefa0e6adc30066e9b66407/resource.tar.gz#test.test_blocks-combine_all_avg--Peephole_/opt.yql" + "checksum": "74bd46cd3b4e6e0003fd1617382d91c2", + "size": 6562, + "uri": "https://{canondata_backend}/1916746/16b0b89d25b015809e26ecf42c6ebb271b7e329d/resource.tar.gz#test.test_blocks-combine_all_avg--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_avg--Plan]": [ @@ -547,9 +547,9 @@ ], "test.test[blocks-combine_hashed_count_filter--Peephole]": [ { - "checksum": "dbad3ef6538bee756fe6988b10c2d026", - "size": 2691, - "uri": "https://{canondata_backend}/1920236/9b3d7b78be8d9e7dc5f97424762bc80230b95228/resource.tar.gz#test.test_blocks-combine_hashed_count_filter--Peephole_/opt.yql" + "checksum": "72546bd6ba3596963df0163bdb47b7ef", + "size": 2718, + "uri": "https://{canondata_backend}/1689644/4afe37611f20ddb78db6921c77de0f60bc674896/resource.tar.gz#test.test_blocks-combine_hashed_count_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_count_filter--Plan]": [ @@ -631,9 +631,9 @@ ], "test.test[blocks-finalize_hashed_keys--Peephole]": [ { - "checksum": "1d999c5294dc83f1a97d3da88c4bcd6d", - "size": 3967, - "uri": "https://{canondata_backend}/1920236/9b3d7b78be8d9e7dc5f97424762bc80230b95228/resource.tar.gz#test.test_blocks-finalize_hashed_keys--Peephole_/opt.yql" + "checksum": "38969a82d581c3ce44fa5c016e693467", + "size": 3975, + "uri": "https://{canondata_backend}/1916746/16b0b89d25b015809e26ecf42c6ebb271b7e329d/resource.tar.gz#test.test_blocks-finalize_hashed_keys--Peephole_/opt.yql" } ], "test.test[blocks-finalize_hashed_keys--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json index d11731f9e3..d6f740086c 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json @@ -436,9 +436,9 @@ ], "test.test[blocks-combine_all_count_filter--Peephole]": [ { - "checksum": "1bd1637694025d218083e5311e2e737e", - "size": 2200, - "uri": "https://{canondata_backend}/1903885/342ba96f93dd1681ca1277caeb9ad885799797d8/resource.tar.gz#test.test_blocks-combine_all_count_filter--Peephole_/opt.yql" + "checksum": "7e6527be84ea04af5daa4690e3affc77", + "size": 2243, + "uri": "https://{canondata_backend}/1936273/198988f944c8f3fd97179a6f8fffeac11f8d1537/resource.tar.gz#test.test_blocks-combine_all_count_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_count_filter--Plan]": [ @@ -464,9 +464,9 @@ ], "test.test[blocks-combine_hashed_minmax_nested--Peephole]": [ { - "checksum": "6a31c3120cf2eb25ca9660df0025dc6d", - "size": 4248, - "uri": "https://{canondata_backend}/1946324/776cdf4fe38b7e55a357e57769c922aaa9c78eb8/resource.tar.gz#test.test_blocks-combine_hashed_minmax_nested--Peephole_/opt.yql" + "checksum": "c51e7072a46b6629eaca5986198f14b7", + "size": 4252, + "uri": "https://{canondata_backend}/1871182/1602ffd1989777b6937512e2491f3b6431e75351/resource.tar.gz#test.test_blocks-combine_hashed_minmax_nested--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_minmax_nested--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json index 591eccf27f..6914a530af 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json @@ -697,9 +697,9 @@ ], "test.test[blocks-combine_all_avg_filter_opt--Peephole]": [ { - "checksum": "457eaa347bc78b8ae704dcfab24cd2c5", - "size": 2450, - "uri": "https://{canondata_backend}/1689644/939d001ead810a20e7af16f81f7b19e996b5ab10/resource.tar.gz#test.test_blocks-combine_all_avg_filter_opt--Peephole_/opt.yql" + "checksum": "a86349d68cb76289dcccb93060bf1534", + "size": 2493, + "uri": "https://{canondata_backend}/1936273/ef8b8680bb231b437c30900120a6917ae15cccaa/resource.tar.gz#test.test_blocks-combine_all_avg_filter_opt--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_avg_filter_opt--Plan]": [ @@ -725,9 +725,9 @@ ], "test.test[blocks-combine_all_max_filter--Peephole]": [ { - "checksum": "1a245481237e4f62ec4b0a9478b46649", - "size": 2052, - "uri": "https://{canondata_backend}/1689644/939d001ead810a20e7af16f81f7b19e996b5ab10/resource.tar.gz#test.test_blocks-combine_all_max_filter--Peephole_/opt.yql" + "checksum": "38a1cf0163b1ee0e882d3e2b4510bf6b", + "size": 2095, + "uri": "https://{canondata_backend}/1936273/ef8b8680bb231b437c30900120a6917ae15cccaa/resource.tar.gz#test.test_blocks-combine_all_max_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_max_filter--Plan]": [ @@ -753,9 +753,9 @@ ], "test.test[blocks-combine_hashed_some--Peephole]": [ { - "checksum": "6c767ae3fdac7034ca43b157bd8894a7", - "size": 3458, - "uri": "https://{canondata_backend}/1775319/5727f49e842aea7c325a2340890ed9e5d39107df/resource.tar.gz#test.test_blocks-combine_hashed_some--Peephole_/opt.yql" + "checksum": "a6278c3a5e5565cca2826f911a303aab", + "size": 3506, + "uri": "https://{canondata_backend}/1871102/fcf3cadf4b55e03aeab83cc56f753fec4c829577/resource.tar.gz#test.test_blocks-combine_hashed_some--Peephole_/opt.yql" } ], "test.test[blocks-combine_hashed_some--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json index 2405a137be..e45f299c03 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json @@ -429,9 +429,9 @@ ], "test.test[blocks-combine_all_count--Peephole]": [ { - "checksum": "d065efb0530ef025a6a377c7e583607d", - "size": 3141, - "uri": "https://{canondata_backend}/1936273/078595a3559c5c438ed530f67006a92558087996/resource.tar.gz#test.test_blocks-combine_all_count--Peephole_/opt.yql" + "checksum": "a05d2b4951425b78494384e3966a40a7", + "size": 3205, + "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-combine_all_count--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_count--Plan]": [ @@ -457,9 +457,9 @@ ], "test.test[blocks-combine_all_sum_filter--Peephole]": [ { - "checksum": "a08ca9ef70405bb6a580d476ca3972f2", - "size": 2052, - "uri": "https://{canondata_backend}/1936273/078595a3559c5c438ed530f67006a92558087996/resource.tar.gz#test.test_blocks-combine_all_sum_filter--Peephole_/opt.yql" + "checksum": "bd161548e7817d6c81c75cf0318f8e42", + "size": 2095, + "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-combine_all_sum_filter--Peephole_/opt.yql" } ], "test.test[blocks-combine_all_sum_filter--Plan]": [ @@ -513,9 +513,9 @@ ], "test.test[blocks-distinct_pure_keys--Peephole]": [ { - "checksum": "c59c6d34d5ee19a75a2f78eeeac2c5ff", - "size": 6736, - "uri": "https://{canondata_backend}/1936947/dd735d32257373e0e81a1a0c690b04eb821b17d3/resource.tar.gz#test.test_blocks-distinct_pure_keys--Peephole_/opt.yql" + "checksum": "5a124ce37da8b898226286c987ad5df1", + "size": 6946, + "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-distinct_pure_keys--Peephole_/opt.yql" } ], "test.test[blocks-distinct_pure_keys--Plan]": [ @@ -541,9 +541,9 @@ ], "test.test[blocks-group_by_complex_key--Peephole]": [ { - "checksum": "51c549ee925a9e1b57fb53b9d8af78bf", - "size": 3993, - "uri": "https://{canondata_backend}/1937367/bf3daea8b53db195e51f49431b822900029c2a5d/resource.tar.gz#test.test_blocks-group_by_complex_key--Peephole_/opt.yql" + "checksum": "ed8ba52767c23cc82a2e65723d2d632d", + "size": 3997, + "uri": "https://{canondata_backend}/1871102/d4092e58bfb3465cc6e1385e1e0fb9ddf349664c/resource.tar.gz#test.test_blocks-group_by_complex_key--Peephole_/opt.yql" } ], "test.test[blocks-group_by_complex_key--Plan]": [ |