aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWhompe <vladluk@ydb.tech>2024-10-16 15:32:25 +0300
committerGitHub <noreply@github.com>2024-10-16 14:32:25 +0200
commit4fc77b199a24e975406ad15a37d4af5e85924e99 (patch)
tree4a9bb7663dc9993dde4267fd29177cc50610eba5
parentb2bc73df8676a727babfe3c63bf9e357a48ad2d3 (diff)
downloadydb-4fc77b199a24e975406ad15a37d4af5e85924e99.tar.gz
Make block combine hashed use stream instead of flow (#9979)
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp14
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp57
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp1648
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp104
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h9
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json24
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json30
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json30
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json24
25 files changed, 1294 insertions, 846 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index f37baa8343..234c2c6dc9 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -791,7 +791,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}
TTypeAnnotationNode::TListType blockItemTypes;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -817,7 +817,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}
@@ -828,7 +828,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
}
TTypeAnnotationNode::TListType blockItemTypes;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -867,7 +867,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}
@@ -879,7 +879,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}
TTypeAnnotationNode::TListType blockItemTypes;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
YQL_ENSURE(blockItemTypes.size() > 0);
@@ -917,7 +917,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}
// disallow any scalar columns except for streamIndex column
- auto itemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
+ auto itemTypes = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
for (ui32 i = 0; i + 1 < itemTypes.size(); ++i) {
bool isScalar = itemTypes[i]->GetKind() == ETypeAnnotationKind::Scalar;
if (isScalar && i != streamIndex) {
@@ -929,7 +929,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 12714671b6..a4c7df599e 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
} else {
stream = AggList;
}
- auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
+
+ TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
if (!blocks) {
return nullptr;
}
@@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
if (hashed) {
aggWideFlow = Ctx.Builder(Node->Pos())
.Callable("WideFromBlocks")
- .Callable(0, "BlockCombineHashed")
- .Add(0, blocks)
- .Callable(1, "Void")
+ .Callable(0, "ToFlow")
+ .Callable(0, "BlockCombineHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Callable(1, "Void")
+ .Seal()
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
- .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Seal()
.Build();
} else {
aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("BlockCombineAll")
- .Add(0, blocks)
- .Callable(1, "Void")
+ .Callable("ToFlow")
+ .Callable(0, "BlockCombineAll")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Callable(1, "Void")
+ .Seal()
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Build();
}
@@ -2891,10 +2900,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
TExprNode::TPtr aggBlocks;
if (!isMany) {
aggBlocks = Ctx.Builder(Node->Pos())
- .Callable("BlockMergeFinalizeHashed")
- .Add(0, blocks)
- .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Callable("ToFlow")
+ .Callable(0, "BlockMergeFinalizeHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Seal()
.Seal()
.Build();
} else {
@@ -2902,12 +2915,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting");
aggBlocks = Ctx.Builder(Node->Pos())
- .Callable("BlockMergeManyFinalizeHashed")
- .Add(0, blocks)
- .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
- .Atom(3, ToString(streamIdxColumn))
- .Add(4, manyStreamsSetting->TailPtr())
+ .Callable("ToFlow")
+ .Callable(0, "BlockMergeManyFinalizeHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Atom(3, ToString(streamIdxColumn))
+ .Add(4, manyStreamsSetting->TailPtr())
+ .Seal()
.Seal()
.Build();
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index eb1ffcba56..a1bef3ae63 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -442,9 +442,20 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
return GetSparseBitmapPopCount(src, len);
}
+TArrayRef<TType *const> GetWideComponents(TType* type) {
+ if (type->IsFlow()) {
+ const auto outputFlowType = AS_TYPE(TFlowType, type);
+ return GetWideComponents(outputFlowType);
+ }
+ if (type->IsStream()) {
+ const auto outputStreamType = AS_TYPE(TStreamType, type);
+ return GetWideComponents(outputStreamType);
+ }
+ MKQL_ENSURE(false, "Expect either flow or stream");
+}
+
size_t CalcMaxBlockLenForOutput(TType* out) {
- const auto outputType = AS_TYPE(TFlowType, out);
- const auto wideComponents = GetWideComponents(outputType);
+ const auto wideComponents = GetWideComponents(out);
MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column");
size_t maxBlockItemSize = 0;
@@ -604,11 +615,99 @@ protected:
#endif
};
-class TBlockCombineAllWrapper : public TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapper>,
+
+struct TBlockCombineAllState : public TComputationValue<TBlockCombineAllState> {
+ NUdf::TUnboxedValue* Pointer_ = nullptr;
+ bool IsFinished_ = false;
+ bool HasValues_ = false;
+ TUnboxedValueVector Values_;
+ std::vector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_;
+ std::vector<char> AggStates_;
+ const std::optional<ui32> FilterColumn_;
+ const size_t Width_;
+
+ TBlockCombineAllState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const std::vector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ , Values_(std::max(width, params.size()))
+ , FilterColumn_(filterColumn)
+ , Width_(width)
+ {
+ Pointer_ = Values_.data();
+
+ ui32 totalStateSize = 0;
+ for (const auto& p : params) {
+ Aggs_.emplace_back(p.Prepared_->Make(ctx));
+ MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch");
+ totalStateSize += Aggs_.back()->StateSize;
+ }
+
+ AggStates_.resize(totalStateSize);
+ char* ptr = AggStates_.data();
+ for (const auto& agg : Aggs_) {
+ agg->InitState(ptr);
+ ptr += agg->StateSize;
+ }
+ }
+
+ void ProcessInput() {
+ const ui64 batchLength = TArrowBlock::From(Values_[Width_ - 1U]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ if (!batchLength) {
+ return;
+ }
+
+ std::optional<ui64> filtered;
+ if (FilterColumn_) {
+ const auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum();
+ if (filterDatum.is_scalar()) {
+ if (!filterDatum.scalar_as<arrow::UInt8Scalar>().value) {
+ return;
+ }
+ } else {
+ const ui64 popCount = GetBitmapPopCount(filterDatum.array());
+ if (popCount == 0) {
+ return;
+ }
+
+ if (popCount < batchLength) {
+ filtered = popCount;
+ }
+ }
+ }
+
+ HasValues_ = true;
+ char* ptr = AggStates_.data();
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ Aggs_[i]->AddMany(ptr, Values_.data(), batchLength, filtered);
+ ptr += Aggs_[i]->StateSize;
+ }
+ }
+
+ bool MakeOutput() {
+ IsFinished_ = true;
+ if (!HasValues_)
+ return false;
+
+ char* ptr = AggStates_.data();
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ Values_[i] = Aggs_[i]->FinishOne(ptr);
+ Aggs_[i]->DestroyState(ptr);
+ ptr += Aggs_[i]->StateSize;
+ }
+ return true;
+ }
+
+ NUdf::TUnboxedValuePod Get(size_t index) const {
+ return Values_[index];
+ }
+};
+
+class TBlockCombineAllWrapperFromFlow : public TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapperFromFlow>,
protected TBlockCombineAllWrapperCodegenBase {
-using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapper>;
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TBlockCombineAllWrapperFromFlow>;
+
+using TState = TBlockCombineAllState;
public:
- TBlockCombineAllWrapper(TComputationMutables& mutables,
+ TBlockCombineAllWrapperFromFlow(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
std::optional<ui32> filterColumn,
size_t width,
@@ -655,95 +754,11 @@ public:
#ifndef MKQL_DISABLE_CODEGEN
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, AggsParams_.size(),
- GetMethodPtr(&TState::Get), GetMethodPtr(&TBlockCombineAllWrapper::MakeState),
+ GetMethodPtr(&TState::Get), GetMethodPtr(&TBlockCombineAllWrapperFromFlow::MakeState),
GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::MakeOutput));
}
#endif
private:
- struct TState : public TComputationValue<TState> {
- NUdf::TUnboxedValue* Pointer_ = nullptr;
- bool IsFinished_ = false;
- bool HasValues_ = false;
- TUnboxedValueVector Values_;
- std::vector<std::unique_ptr<IBlockAggregatorCombineAll>> Aggs_;
- std::vector<char> AggStates_;
- const std::optional<ui32> FilterColumn_;
- const size_t Width_;
-
- TState(TMemoryUsageInfo* memInfo, size_t width, std::optional<ui32> filterColumn, const std::vector<TAggParams<IBlockAggregatorCombineAll>>& params, TComputationContext& ctx)
- : TComputationValue(memInfo)
- , Values_(std::max(width, params.size()))
- , FilterColumn_(filterColumn)
- , Width_(width)
- {
- Pointer_ = Values_.data();
-
- ui32 totalStateSize = 0;
- for (const auto& p : params) {
- Aggs_.emplace_back(p.Prepared_->Make(ctx));
- MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch");
- totalStateSize += Aggs_.back()->StateSize;
- }
-
- AggStates_.resize(totalStateSize);
- char* ptr = AggStates_.data();
- for (const auto& agg : Aggs_) {
- agg->InitState(ptr);
- ptr += agg->StateSize;
- }
- }
-
- void ProcessInput() {
- const ui64 batchLength = TArrowBlock::From(Values_[Width_ - 1U]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
- if (!batchLength) {
- return;
- }
-
- std::optional<ui64> filtered;
- if (FilterColumn_) {
- const auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum();
- if (filterDatum.is_scalar()) {
- if (!filterDatum.scalar_as<arrow::UInt8Scalar>().value) {
- return;
- }
- } else {
- const ui64 popCount = GetBitmapPopCount(filterDatum.array());
- if (popCount == 0) {
- return;
- }
-
- if (popCount < batchLength) {
- filtered = popCount;
- }
- }
- }
-
- HasValues_ = true;
- char* ptr = AggStates_.data();
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- Aggs_[i]->AddMany(ptr, Values_.data(), batchLength, filtered);
- ptr += Aggs_[i]->StateSize;
- }
- }
-
- bool MakeOutput() {
- IsFinished_ = true;
- if (!HasValues_)
- return false;
-
- char* ptr = AggStates_.data();
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- Values_[i] = Aggs_[i]->FinishOne(ptr);
- Aggs_[i]->DestroyState(ptr);
- ptr += Aggs_[i]->StateSize;
- }
- return true;
- }
-
- NUdf::TUnboxedValuePod Get(size_t index) const {
- return Values_[index];
- }
- };
void RegisterDependencies() const final {
FlowDependsOn(Flow_);
}
@@ -773,6 +788,89 @@ private:
const size_t WideFieldsIndex_;
};
+class TBlockCombineAllWrapperFromStream : public TMutableComputationNode<TBlockCombineAllWrapperFromStream> {
+using TBaseComputation = TMutableComputationNode<TBlockCombineAllWrapperFromStream>;
+
+using TState = TBlockCombineAllState;
+public:
+ TBlockCombineAllWrapperFromStream(TComputationMutables& mutables,
+ IComputationNode* stream,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ std::vector<TAggParams<IBlockAggregatorCombineAll>>&& aggsParams)
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , Stream_(stream)
+ , FilterColumn_(filterColumn)
+ , Width_(width)
+ , AggsParams_(std::move(aggsParams))
+ , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width))
+ {
+ MKQL_ENSURE(Width_ > 0, "Missing block length column");
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+ {
+ const auto state = ctx.HolderFactory.Create<TState>(Width_, FilterColumn_, AggsParams_, ctx);
+ return ctx.HolderFactory.Create<TStreamValue>(std::move(state), std::move(Stream_->GetValue(ctx)));
+ }
+
+private:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ using TBase = TComputationValue<TStreamValue>;
+ public:
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& state, NUdf::TUnboxedValue&& stream)
+ : TBase(memInfo)
+ , State_(state)
+ , Stream_(stream)
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ TState& state = *static_cast<TState*>(State_.AsBoxed().Get());
+ auto* inputFields = state.Values_.data();
+ const size_t inputWidth = state.Width_;
+
+ if (state.IsFinished_)
+ return NUdf::EFetchStatus::Finish;
+
+ while (true) {
+ switch (Stream_.WideFetch(inputFields, inputWidth)) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Ok:
+ state.ProcessInput();
+ continue;
+ case NUdf::EFetchStatus::Finish:
+ break;
+ }
+ if (state.MakeOutput()) {
+ for (size_t i = 0; i < width; ++i) {
+ output[i] = state.Get(i);
+ }
+ return NUdf::EFetchStatus::Ok;
+ }
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+ private:
+ NUdf::TUnboxedValue State_;
+ NUdf::TUnboxedValue Stream_;
+ };
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(Stream_);
+ }
+
+private:
+ IComputationNode *const Stream_;
+ const std::optional<ui32> FilterColumn_;
+ const size_t Width_;
+ const std::vector<TAggParams<IBlockAggregatorCombineAll>> AggsParams_;
+ const size_t WideFieldsIndex_;
+};
+
template <typename T>
T MakeKey(TStringBuf s, ui32 keyLength) {
Y_UNUSED(keyLength);
@@ -1050,585 +1148,594 @@ protected:
};
template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived>
-class THashedWrapperBase : public TStatefulWideFlowCodegeneratorNode<TDerived>,
- protected THashedWrapperCodegenBase
-{
- using TComputationBase = TStatefulWideFlowCodegeneratorNode<TDerived>;
+struct THashedWrapperBaseState : public TBlockState {
+private:
static constexpr bool UseArena = !InlineAggState && std::is_same<TFixedAggState, TStateArena>::value;
public:
- THashedWrapperBase(TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
- std::optional<ui32> filterColumn,
- size_t width,
- const std::vector<TKeyParams>& keys,
- size_t maxBlockLen,
- ui32 keyLength,
- std::vector<TAggParams<TAggregator>>&& aggsParams,
- ui32 streamIndex,
- std::vector<std::vector<ui32>>&& streams)
- : TComputationBase(mutables, flow, EValueRepresentation::Boxed)
- , Flow_(flow)
+ bool WritingOutput_ = false;
+ bool IsFinished_ = false;
+
+ const std::optional<ui32> FilterColumn_;
+ const std::vector<TKeyParams> Keys_;
+ const std::vector<TAggParams<TAggregator>>& AggsParams_;
+ const ui32 KeyLength_;
+ const ui32 StreamIndex_;
+ const std::vector<std::vector<ui32>> Streams_;
+ const size_t MaxBlockLen_;
+ const size_t Width_;
+ const size_t OutputWidth_;
+
+ template<typename TKeyType>
+ struct THashSettings {
+ static constexpr bool CacheHash = std::is_same_v<TKeyType, TSSOKey>;
+ };
+ using TDynMapImpl = TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
+ using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
+ using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
+
+ ui64 BatchNum_ = 0;
+ TUnboxedValueVector Values_;
+ std::vector<std::unique_ptr<TAggregator>> Aggs_;
+ std::vector<ui32> AggStateOffsets_;
+ TUnboxedValueVector UnwrappedValues_;
+ std::vector<std::unique_ptr<IBlockReader>> Readers_;
+ std::vector<std::unique_ptr<IArrayBuilder>> Builders_;
+ std::vector<std::unique_ptr<IAggColumnBuilder>> AggBuilders_;
+ bool HasValues_ = false;
+ ui32 TotalStateSize_ = 0;
+ size_t OutputBlockSize_ = 0;
+ std::unique_ptr<TDynMapImpl> HashMap_;
+ typename TDynMapImpl::const_iterator HashMapIt_;
+ std::unique_ptr<TSetImpl> HashSet_;
+ typename TSetImpl::const_iterator HashSetIt_;
+ std::unique_ptr<TFixedMapImpl> HashFixedMap_;
+ typename TFixedMapImpl::const_iterator HashFixedMapIt_;
+ TPagedArena Arena_;
+
+ THashedWrapperBaseState(TMemoryUsageInfo* memInfo, ui32 keyLength, ui32 streamIndex, size_t width, size_t outputWidth, std::optional<ui32> filterColumn, const std::vector<TAggParams<TAggregator>>& params,
+ const std::vector<std::vector<ui32>>& streams, const std::vector<TKeyParams>& keys, size_t maxBlockLen, TComputationContext& ctx)
+ : TBlockState(memInfo, outputWidth)
, FilterColumn_(filterColumn)
- , Width_(width)
- , OutputWidth_(keys.size() + aggsParams.size() + 1)
- , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width))
, Keys_(keys)
- , MaxBlockLen_(maxBlockLen)
- , AggsParams_(std::move(aggsParams))
+ , AggsParams_(params)
, KeyLength_(keyLength)
, StreamIndex_(streamIndex)
- , Streams_(std::move(streams))
+ , Streams_(streams)
+ , MaxBlockLen_(maxBlockLen)
+ , Width_(width)
+ , OutputWidth_(outputWidth)
+ , Values_(width)
+ , UnwrappedValues_(width)
+ , Readers_(keys.size())
+ , Builders_(keys.size())
+ , Arena_(TlsAllocState)
{
- MKQL_ENSURE(Width_ > 0, "Missing block length column");
- if constexpr (UseFilter) {
- MKQL_ENSURE(filterColumn, "Missing filter column");
- MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize");
- } else {
- MKQL_ENSURE(!filterColumn, "Unexpected filter column");
+ Pointer_ = Values_.data();
+ for (size_t i = 0; i < Keys_.size(); ++i) {
+ auto itemType = AS_TYPE(TBlockType, Keys_[i].Type)->GetItemType();
+ Readers_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType);
+ Builders_[i] = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), itemType, ctx.ArrowMemoryPool, MaxBlockLen_, &ctx.Builder->GetPgBuilder());
}
- }
-
- EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
- TComputationContext& ctx,
- NUdf::TUnboxedValue*const* output) const
- {
- auto& s = GetState(state, ctx);
- if (!s.Count) {
- if (s.IsFinished_)
- return EFetchResult::Finish;
-
- while (!s.WritingOutput_) {
- const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
- s.Values_.assign(s.Values_.size(), NUdf::TUnboxedValuePod());
- switch (Flow_->FetchValues(ctx, fields)) {
- case EFetchResult::Yield:
- return EFetchResult::Yield;
- case EFetchResult::One:
- s.ProcessInput(ctx.HolderFactory);
- continue;
- case EFetchResult::Finish:
- break;
- }
- if (s.Finish())
- break;
- else
- return EFetchResult::Finish;
- }
+ if constexpr (Many) {
+ TotalStateSize_ += Streams_.size();
+ }
- if (!s.FillOutput(ctx.HolderFactory))
- return EFetchResult::Finish;
+ for (const auto& p : AggsParams_) {
+ Aggs_.emplace_back(p.Prepared_->Make(ctx));
+ MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch");
+ AggStateOffsets_.emplace_back(TotalStateSize_);
+ TotalStateSize_ += Aggs_.back()->StateSize;
}
- const auto sliceSize = s.Slice();
- for (size_t i = 0; i < OutputWidth_; ++i) {
- if (const auto out = output[i]) {
- *out = s.Get(sliceSize, ctx.HolderFactory, i);
+ auto equal = MakeEqual<TKey>(KeyLength_);
+ auto hasher = MakeHash<TKey>(KeyLength_);
+ if constexpr (UseSet) {
+ MKQL_ENSURE(params.empty(), "Only keys are supported");
+ HashSet_ = std::make_unique<THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal);
+ } else {
+ if (!InlineAggState) {
+ HashFixedMap_ = std::make_unique<TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal);
+ } else {
+ HashMap_ = std::make_unique<TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(TotalStateSize_, hasher, equal);
}
}
- return EFetchResult::One;
}
-#ifndef MKQL_DISABLE_CODEGEN
- ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
- return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, OutputWidth_,
- GetMethodPtr(&TState::Get), GetMethodPtr(&THashedWrapperBase::MakeState),
- GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::Finish),
- GetMethodPtr(&TState::FillOutput), GetMethodPtr(&TState::Slice));
- }
-#endif
-private:
- struct TState : public TBlockState {
- bool WritingOutput_ = false;
- bool IsFinished_ = false;
-
- const std::optional<ui32> FilterColumn_;
- const std::vector<TKeyParams> Keys_;
- const std::vector<TAggParams<TAggregator>>& AggsParams_;
- const ui32 KeyLength_;
- const ui32 StreamIndex_;
- const std::vector<std::vector<ui32>> Streams_;
- const size_t MaxBlockLen_;
-
- template<typename TKeyType>
- struct THashSettings {
- static constexpr bool CacheHash = std::is_same_v<TKeyType, TSSOKey>;
- };
- using TDynMapImpl = TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
- using TSetImpl = THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
- using TFixedMapImpl = TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>;
-
- ui64 BatchNum_ = 0;
- TUnboxedValueVector Values_;
- std::vector<std::unique_ptr<TAggregator>> Aggs_;
- std::vector<ui32> AggStateOffsets_;
- TUnboxedValueVector UnwrappedValues_;
- std::vector<std::unique_ptr<IBlockReader>> Readers_;
- std::vector<std::unique_ptr<IArrayBuilder>> Builders_;
- std::vector<std::unique_ptr<IAggColumnBuilder>> AggBuilders_;
- bool HasValues_ = false;
- ui32 TotalStateSize_ = 0;
- size_t OutputBlockSize_ = 0;
- std::unique_ptr<TDynMapImpl> HashMap_;
- typename TDynMapImpl::const_iterator HashMapIt_;
- std::unique_ptr<TSetImpl> HashSet_;
- typename TSetImpl::const_iterator HashSetIt_;
- std::unique_ptr<TFixedMapImpl> HashFixedMap_;
- typename TFixedMapImpl::const_iterator HashFixedMapIt_;
- TPagedArena Arena_;
-
- TState(TMemoryUsageInfo* memInfo, ui32 keyLength, ui32 streamIndex, size_t width, size_t outputWidth, std::optional<ui32> filterColumn, const std::vector<TAggParams<TAggregator>>& params,
- const std::vector<std::vector<ui32>>& streams, const std::vector<TKeyParams>& keys, size_t maxBlockLen, TComputationContext& ctx)
- : TBlockState(memInfo, outputWidth)
- , FilterColumn_(filterColumn)
- , Keys_(keys)
- , AggsParams_(params)
- , KeyLength_(keyLength)
- , StreamIndex_(streamIndex)
- , Streams_(streams)
- , MaxBlockLen_(maxBlockLen)
- , Values_(width)
- , UnwrappedValues_(width)
- , Readers_(keys.size())
- , Builders_(keys.size())
- , Arena_(TlsAllocState)
- {
- Pointer_ = Values_.data();
- for (size_t i = 0; i < Keys_.size(); ++i) {
- auto itemType = AS_TYPE(TBlockType, Keys_[i].Type)->GetItemType();
- Readers_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType);
- Builders_[i] = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), itemType, ctx.ArrowMemoryPool, MaxBlockLen_, &ctx.Builder->GetPgBuilder());
- }
-
- if constexpr (Many) {
- TotalStateSize_ += Streams_.size();
- }
- for (const auto& p : AggsParams_) {
- Aggs_.emplace_back(p.Prepared_->Make(ctx));
- MKQL_ENSURE(Aggs_.back()->StateSize == p.Prepared_->StateSize, "State size mismatch");
- AggStateOffsets_.emplace_back(TotalStateSize_);
- TotalStateSize_ += Aggs_.back()->StateSize;
- }
+ void ProcessInput(const THolderFactory& holderFactory) {
+ ++BatchNum_;
+ const auto batchLength = TArrowBlock::From(Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ if (!batchLength) {
+ return;
+ }
- auto equal = MakeEqual<TKey>(KeyLength_);
- auto hasher = MakeHash<TKey>(KeyLength_);
- if constexpr (UseSet) {
- MKQL_ENSURE(params.empty(), "Only keys are supported");
- HashSet_ = std::make_unique<THashSetImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal);
+ const ui8* filterBitmap = nullptr;
+ if constexpr (UseFilter) {
+ auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum();
+ if (filterDatum.is_scalar()) {
+ if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) {
+ return;
+ }
} else {
- if (!InlineAggState) {
- HashFixedMap_ = std::make_unique<TFixedHashMapImpl<TKey, TFixedAggState, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(hasher, equal);
- } else {
- HashMap_ = std::make_unique<TDynamicHashMapImpl<TKey, std::equal_to<TKey>, std::hash<TKey>, TMKQLAllocator<char>, THashSettings<TKey>>>(TotalStateSize_, hasher, equal);
+ const auto& arr = filterDatum.array();
+ filterBitmap = arr->template GetValues<ui8>(1);
+ ui64 popCount = GetBitmapPopCount(arr);
+ if (popCount == 0) {
+ return;
}
}
}
- void ProcessInput(const THolderFactory& holderFactory) {
- ++BatchNum_;
- const auto batchLength = TArrowBlock::From(Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
- if (!batchLength) {
- return;
+ const ui32* streamIndexData = nullptr;
+ TMaybe<ui32> streamIndexScalar;
+ if constexpr (Many) {
+ auto streamIndexDatum = TArrowBlock::From(Values_[StreamIndex_]).GetDatum();
+ if (streamIndexDatum.is_scalar()) {
+ streamIndexScalar = streamIndexDatum.template scalar_as<arrow::UInt32Scalar>().value;
+ } else {
+ MKQL_ENSURE(streamIndexDatum.is_array(), "Expected array");
+ streamIndexData = streamIndexDatum.array()->template GetValues<ui32>(1);
}
-
- const ui8* filterBitmap = nullptr;
- if constexpr (UseFilter) {
- auto filterDatum = TArrowBlock::From(Values_[*FilterColumn_]).GetDatum();
- if (filterDatum.is_scalar()) {
- if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) {
- return;
- }
- } else {
- const auto& arr = filterDatum.array();
- filterBitmap = arr->template GetValues<ui8>(1);
- ui64 popCount = GetBitmapPopCount(arr);
- if (popCount == 0) {
- return;
- }
- }
+ UnwrappedValues_ = Values_;
+ for (const auto& p : AggsParams_) {
+ const auto& columnDatum = TArrowBlock::From(UnwrappedValues_[p.Column_]).GetDatum();
+ MKQL_ENSURE(columnDatum.is_array(), "Expected array");
+ UnwrappedValues_[p.Column_] = holderFactory.CreateArrowBlock(Unwrap(*columnDatum.array(), p.StateType_));
}
+ }
- const ui32* streamIndexData = nullptr;
- TMaybe<ui32> streamIndexScalar;
- if constexpr (Many) {
- auto streamIndexDatum = TArrowBlock::From(Values_[StreamIndex_]).GetDatum();
- if (streamIndexDatum.is_scalar()) {
- streamIndexScalar = streamIndexDatum.template scalar_as<arrow::UInt32Scalar>().value;
- } else {
- MKQL_ENSURE(streamIndexDatum.is_array(), "Expected array");
- streamIndexData = streamIndexDatum.array()->template GetValues<ui32>(1);
- }
- UnwrappedValues_ = Values_;
- for (const auto& p : AggsParams_) {
- const auto& columnDatum = TArrowBlock::From(UnwrappedValues_[p.Column_]).GetDatum();
- MKQL_ENSURE(columnDatum.is_array(), "Expected array");
- UnwrappedValues_[p.Column_] = holderFactory.CreateArrowBlock(Unwrap(*columnDatum.array(), p.StateType_));
- }
- }
+ HasValues_ = true;
+ std::vector<arrow::Datum> keysDatum;
+ keysDatum.reserve(Keys_.size());
+ for (ui32 i = 0; i < Keys_.size(); ++i) {
+ keysDatum.emplace_back(TArrowBlock::From(Values_[Keys_[i].Index]).GetDatum());
+ }
- HasValues_ = true;
- std::vector<arrow::Datum> keysDatum;
- keysDatum.reserve(Keys_.size());
- for (ui32 i = 0; i < Keys_.size(); ++i) {
- keysDatum.emplace_back(TArrowBlock::From(Values_[Keys_[i].Index]).GetDatum());
- }
+ std::array<TOutputBuffer, PrefetchBatchSize> out;
+ for (ui32 i = 0; i < PrefetchBatchSize; ++i) {
+ out[i].Resize(sizeof(TKey));
+ }
- std::array<TOutputBuffer, PrefetchBatchSize> out;
- for (ui32 i = 0; i < PrefetchBatchSize; ++i) {
- out[i].Resize(sizeof(TKey));
+ std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch;
+ std::array<ui64, PrefetchBatchSize> insertBatchRows;
+ std::array<char*, PrefetchBatchSize> insertBatchPayloads;
+ std::array<bool, PrefetchBatchSize> insertBatchIsNew;
+ ui32 insertBatchLen = 0;
+
+ const auto processInsertBatch = [&]() {
+ for (ui32 i = 0; i < insertBatchLen; ++i) {
+ auto& r = insertBatch[i];
+ TStringBuf str = out[i].Finish();
+ TKey key = MakeKey<TKey>(str, KeyLength_);
+ r.ConstructKey(key);
}
- std::array<TRobinHoodBatchRequestItem<TKey>, PrefetchBatchSize> insertBatch;
- std::array<ui64, PrefetchBatchSize> insertBatchRows;
- std::array<char*, PrefetchBatchSize> insertBatchPayloads;
- std::array<bool, PrefetchBatchSize> insertBatchIsNew;
- ui32 insertBatchLen = 0;
-
- const auto processInsertBatch = [&]() {
- for (ui32 i = 0; i < insertBatchLen; ++i) {
- auto& r = insertBatch[i];
- TStringBuf str = out[i].Finish();
- TKey key = MakeKey<TKey>(str, KeyLength_);
- r.ConstructKey(key);
+ if constexpr (UseSet) {
+ HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename THashedWrapperBaseState::TSetImpl::iterator iter, bool isNew) {
+ Y_UNUSED(index);
+ if (isNew) {
+ if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ MoveKeyToArena(HashSet_->GetKey(iter), Arena_, KeyLength_);
+ }
+ }
+ });
+ } else {
+ using THashTable = std::conditional_t<InlineAggState, typename THashedWrapperBaseState::TDynMapImpl, typename THashedWrapperBaseState::TFixedMapImpl>;
+ THashTable* hash;
+ if constexpr (!InlineAggState) {
+ hash = HashFixedMap_.get();
+ } else {
+ hash = HashMap_.get();
}
- if constexpr (UseSet) {
- HashSet_->BatchInsert({insertBatch.data(), insertBatchLen},[&](size_t index, typename TState::TSetImpl::iterator iter, bool isNew) {
- Y_UNUSED(index);
- if (isNew) {
- if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
- MoveKeyToArena(HashSet_->GetKey(iter), Arena_, KeyLength_);
- }
+ hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) {
+ if (isNew) {
+ if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ MoveKeyToArena(hash->GetKey(iter), Arena_, KeyLength_);
}
- });
- } else {
- using THashTable = std::conditional_t<InlineAggState, typename TState::TDynMapImpl, typename TState::TFixedMapImpl>;
- THashTable* hash;
- if constexpr (!InlineAggState) {
- hash = HashFixedMap_.get();
- } else {
- hash = HashMap_.get();
}
- hash->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t index, typename THashTable::iterator iter, bool isNew) {
+ if constexpr (UseArena) {
+ // prefetch payloads only
+ auto payload = hash->GetPayload(iter);
+ char* ptr;
if (isNew) {
- if constexpr (std::is_same<TKey, TSSOKey>::value || std::is_same<TKey, TExternalFixedSizeKey>::value) {
- MoveKeyToArena(hash->GetKey(iter), Arena_, KeyLength_);
- }
- }
-
- if constexpr (UseArena) {
- // prefetch payloads only
- auto payload = hash->GetPayload(iter);
- char* ptr;
- if (isNew) {
- ptr = (char*)Arena_.Alloc(TotalStateSize_);
- *(char**)payload = ptr;
- } else {
- ptr = *(char**)payload;
- }
-
- insertBatchIsNew[index] = isNew;
- insertBatchPayloads[index] = ptr;
- NYql::PrefetchForWrite(ptr);
+ ptr = (char*)Arena_.Alloc(TotalStateSize_);
+ *(char**)payload = ptr;
} else {
- // process insert
- auto payload = (char*)hash->GetPayload(iter);
- auto row = insertBatchRows[index];
- ui32 streamIndex = 0;
- if constexpr (Many) {
- streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
- }
-
- Insert(row, payload, isNew, streamIndex);
+ ptr = *(char**)payload;
}
- });
- if constexpr (UseArena) {
- for (ui32 i = 0; i < insertBatchLen; ++i) {
- auto row = insertBatchRows[i];
- ui32 streamIndex = 0;
- if constexpr (Many) {
- streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
- }
-
- bool isNew = insertBatchIsNew[i];
- char* payload = insertBatchPayloads[i];
- Insert(row, payload, isNew, streamIndex);
+ insertBatchIsNew[index] = isNew;
+ insertBatchPayloads[index] = ptr;
+ NYql::PrefetchForWrite(ptr);
+ } else {
+ // process insert
+ auto payload = (char*)hash->GetPayload(iter);
+ auto row = insertBatchRows[index];
+ ui32 streamIndex = 0;
+ if constexpr (Many) {
+ streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
}
+
+ Insert(row, payload, isNew, streamIndex);
}
- }
- };
+ });
- for (ui64 row = 0; row < batchLength; ++row) {
- if constexpr (UseFilter) {
- if (filterBitmap && !filterBitmap[row]) {
- continue;
+ if constexpr (UseArena) {
+ for (ui32 i = 0; i < insertBatchLen; ++i) {
+ auto row = insertBatchRows[i];
+ ui32 streamIndex = 0;
+ if constexpr (Many) {
+ streamIndex = streamIndexScalar ? *streamIndexScalar : streamIndexData[row];
+ }
+
+ bool isNew = insertBatchIsNew[i];
+ char* payload = insertBatchPayloads[i];
+ Insert(row, payload, isNew, streamIndex);
}
}
+ }
+ };
- // encode key
- out[insertBatchLen].Rewind();
- for (ui32 i = 0; i < keysDatum.size(); ++i) {
- if (keysDatum[i].is_scalar()) {
- // TODO: more efficient code when grouping by scalar
- Readers_[i]->SaveScalarItem(*keysDatum[i].scalar(), out[insertBatchLen]);
- } else {
- Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]);
- }
+ for (ui64 row = 0; row < batchLength; ++row) {
+ if constexpr (UseFilter) {
+ if (filterBitmap && !filterBitmap[row]) {
+ continue;
}
+ }
- insertBatchRows[insertBatchLen] = row;
- ++insertBatchLen;
- if (insertBatchLen == PrefetchBatchSize) {
- processInsertBatch();
- insertBatchLen = 0;
+ // encode key
+ out[insertBatchLen].Rewind();
+ for (ui32 i = 0; i < keysDatum.size(); ++i) {
+ if (keysDatum[i].is_scalar()) {
+ // TODO: more efficient code when grouping by scalar
+ Readers_[i]->SaveScalarItem(*keysDatum[i].scalar(), out[insertBatchLen]);
+ } else {
+ Readers_[i]->SaveItem(*keysDatum[i].array(), row, out[insertBatchLen]);
}
}
- processInsertBatch();
+ insertBatchRows[insertBatchLen] = row;
+ ++insertBatchLen;
+ if (insertBatchLen == PrefetchBatchSize) {
+ processInsertBatch();
+ insertBatchLen = 0;
+ }
}
- bool Finish() {
- if (!HasValues_) {
- IsFinished_ = true;
- return false;
- }
+ processInsertBatch();
+ }
- WritingOutput_ = true;
- OutputBlockSize_ = 0;
- PrepareAggBuilders();
+ bool Finish() {
+ if (!HasValues_) {
+ IsFinished_ = true;
+ return false;
+ }
- if constexpr (UseSet) {
- HashSetIt_ = HashSet_->Begin();
+ WritingOutput_ = true;
+ OutputBlockSize_ = 0;
+ PrepareAggBuilders();
+
+ if constexpr (UseSet) {
+ HashSetIt_ = HashSet_->Begin();
+ } else {
+ if constexpr (!InlineAggState) {
+ HashFixedMapIt_ = HashFixedMap_->Begin();
} else {
- if constexpr (!InlineAggState) {
- HashFixedMapIt_ = HashFixedMap_->Begin();
- } else {
- HashMapIt_ = HashMap_->Begin();
- }
+ HashMapIt_ = HashMap_->Begin();
}
- return true;
}
+ return true;
+ }
- bool FillOutput(const THolderFactory& holderFactory) {
- bool exit = false;
- while (WritingOutput_) {
- if constexpr (UseSet) {
- for (;!exit && HashSetIt_ != HashSet_->End(); HashSet_->Advance(HashSetIt_)) {
- if (!HashSet_->IsValid(HashSetIt_)) {
- continue;
- }
-
- if (OutputBlockSize_ == MaxBlockLen_) {
- Flush(false, holderFactory);
- //return EFetchResult::One;
- exit = true;
- break;
- }
-
- const TKey& key = HashSet_->GetKey(HashSetIt_);
- TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
- for (auto& kb : Builders_) {
- kb->Add(in);
- }
- ++OutputBlockSize_;
+ bool FillOutput(const THolderFactory& holderFactory) {
+ bool exit = false;
+ while (WritingOutput_) {
+ if constexpr (UseSet) {
+ for (;!exit && HashSetIt_ != HashSet_->End(); HashSet_->Advance(HashSetIt_)) {
+ if (!HashSet_->IsValid(HashSetIt_)) {
+ continue;
}
- break;
- } else {
- const bool done = InlineAggState ?
- Iterate(*HashMap_, HashMapIt_) :
- Iterate(*HashFixedMap_, HashFixedMapIt_);
- if (done) {
+
+ if (OutputBlockSize_ == MaxBlockLen_) {
+ Flush(false, holderFactory);
+ //return EFetchResult::One;
+ exit = true;
break;
}
- Flush(false, holderFactory);
- exit = true;
+
+ const TKey& key = HashSet_->GetKey(HashSetIt_);
+ TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
+ for (auto& kb : Builders_) {
+ kb->Add(in);
+ }
+ ++OutputBlockSize_;
+ }
+ break;
+ } else {
+ const bool done = InlineAggState ?
+ Iterate(*HashMap_, HashMapIt_) :
+ Iterate(*HashFixedMap_, HashFixedMapIt_);
+ if (done) {
break;
}
+ Flush(false, holderFactory);
+ exit = true;
+ break;
}
+ }
- if (!exit) {
- IsFinished_ = true;
- WritingOutput_ = false;
- if (!OutputBlockSize_)
- return false;
- Flush(true, holderFactory);
- }
-
- FillArrays();
- return true;
+ if (!exit) {
+ IsFinished_ = true;
+ WritingOutput_ = false;
+ if (!OutputBlockSize_)
+ return false;
+ Flush(true, holderFactory);
}
- private:
- void PrepareAggBuilders() {
- if constexpr (!UseSet) {
- AggBuilders_.clear();
- AggBuilders_.reserve(Aggs_.size());
- for (const auto& a : Aggs_) {
- if constexpr (Finalize) {
- AggBuilders_.emplace_back(a->MakeResultBuilder(MaxBlockLen_));
- } else {
- AggBuilders_.emplace_back(a->MakeStateBuilder(MaxBlockLen_));
- }
+
+ FillArrays();
+ return true;
+ }
+private:
+ void PrepareAggBuilders() {
+ if constexpr (!UseSet) {
+ AggBuilders_.clear();
+ AggBuilders_.reserve(Aggs_.size());
+ for (const auto& a : Aggs_) {
+ if constexpr (Finalize) {
+ AggBuilders_.emplace_back(a->MakeResultBuilder(MaxBlockLen_));
+ } else {
+ AggBuilders_.emplace_back(a->MakeStateBuilder(MaxBlockLen_));
}
}
}
+ }
- void Flush(bool final, const THolderFactory& holderFactory) {
- if (!OutputBlockSize_) {
- return;
- }
+ void Flush(bool final, const THolderFactory& holderFactory) {
+ if (!OutputBlockSize_) {
+ return;
+ }
- for (size_t i = 0; i < Builders_.size(); ++i) {
- Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(final));
- }
+ for (size_t i = 0; i < Builders_.size(); ++i) {
+ Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(final));
+ }
- if constexpr (!UseSet) {
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- Values[Builders_.size() + i] = AggBuilders_[i]->Build();
- }
- if (!final) {
- PrepareAggBuilders();
- }
+ if constexpr (!UseSet) {
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ Values[Builders_.size() + i] = AggBuilders_[i]->Build();
+ }
+ if (!final) {
+ PrepareAggBuilders();
}
-
- Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputBlockSize_)));
- OutputBlockSize_ = 0;
}
- void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex) const {
- char* ptr = payload;
+ Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputBlockSize_)));
+ OutputBlockSize_ = 0;
+ }
- if (isNew) {
- if constexpr (Many) {
- static_assert(Finalize);
- MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
- memset(ptr, 0, Streams_.size());
- ptr[currentStreamIndex] = 1;
+ void Insert(ui64 row, char* payload, bool isNew, ui32 currentStreamIndex) const {
+ char* ptr = payload;
- for (auto i : Streams_[currentStreamIndex]) {
+ if (isNew) {
+ if constexpr (Many) {
+ static_assert(Finalize);
+ MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
+ memset(ptr, 0, Streams_.size());
+ ptr[currentStreamIndex] = 1;
- Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
- }
- } else {
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- if constexpr (Finalize) {
- Aggs_[i]->LoadState(ptr, BatchNum_, Values_.data(), row);
- } else {
- Aggs_[i]->InitKey(ptr, BatchNum_, Values_.data(), row);
- }
+ for (auto i : Streams_[currentStreamIndex]) {
- ptr += Aggs_[i]->StateSize;
- }
+ Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
}
} else {
- if constexpr (Many) {
- static_assert(Finalize);
- MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
-
- bool isNewStream = !ptr[currentStreamIndex];
- ptr[currentStreamIndex] = 1;
-
- for (auto i : Streams_[currentStreamIndex]) {
-
- if (isNewStream) {
- Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
- } else {
- Aggs_[i]->UpdateState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
- }
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ if constexpr (Finalize) {
+ Aggs_[i]->LoadState(ptr, BatchNum_, Values_.data(), row);
+ } else {
+ Aggs_[i]->InitKey(ptr, BatchNum_, Values_.data(), row);
}
- } else {
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- if constexpr (Finalize) {
- Aggs_[i]->UpdateState(ptr, BatchNum_, Values_.data(), row);
- } else {
- Aggs_[i]->UpdateKey(ptr, BatchNum_, Values_.data(), row);
- }
- ptr += Aggs_[i]->StateSize;
- }
+ ptr += Aggs_[i]->StateSize;
}
}
- }
+ } else {
+ if constexpr (Many) {
+ static_assert(Finalize);
+ MKQL_ENSURE(currentStreamIndex < Streams_.size(), "Invalid stream index");
- template <typename THash>
- bool Iterate(THash& hash, typename THash::const_iterator& iter) {
- MKQL_ENSURE(WritingOutput_, "Supposed to be called at the end");
- std::array<typename THash::const_iterator, PrefetchBatchSize> iters;
- ui32 itersLen = 0;
- auto iterateBatch = [&]() {
- for (ui32 i = 0; i < itersLen; ++i) {
- auto iter = iters[i];
- const TKey& key = hash.GetKey(iter);
- auto payload = (char*)hash.GetPayload(iter);
- char* ptr;
- if constexpr (UseArena) {
- ptr = *(char**)payload;
+ bool isNewStream = !ptr[currentStreamIndex];
+ ptr[currentStreamIndex] = 1;
+
+ for (auto i : Streams_[currentStreamIndex]) {
+
+ if (isNewStream) {
+ Aggs_[i]->LoadState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
} else {
- ptr = payload;
+ Aggs_[i]->UpdateState(ptr + AggStateOffsets_[i], BatchNum_, UnwrappedValues_.data(), row);
}
-
- TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
- for (auto& kb : Builders_) {
- kb->Add(in);
+ }
+ } else {
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ if constexpr (Finalize) {
+ Aggs_[i]->UpdateState(ptr, BatchNum_, Values_.data(), row);
+ } else {
+ Aggs_[i]->UpdateKey(ptr, BatchNum_, Values_.data(), row);
}
- if constexpr (Many) {
- for (ui32 i = 0; i < Streams_.size(); ++i) {
- MKQL_ENSURE(ptr[i], "Missing partial aggregation state for stream #" << i);
- }
+ ptr += Aggs_[i]->StateSize;
+ }
+ }
+ }
+ }
- ptr += Streams_.size();
- }
+ template <typename THash>
+ bool Iterate(THash& hash, typename THash::const_iterator& iter) {
+ MKQL_ENSURE(WritingOutput_, "Supposed to be called at the end");
+ std::array<typename THash::const_iterator, PrefetchBatchSize> iters;
+ ui32 itersLen = 0;
+ auto iterateBatch = [&]() {
+ for (ui32 i = 0; i < itersLen; ++i) {
+ auto iter = iters[i];
+ const TKey& key = hash.GetKey(iter);
+ auto payload = (char*)hash.GetPayload(iter);
+ char* ptr;
+ if constexpr (UseArena) {
+ ptr = *(char**)payload;
+ } else {
+ ptr = payload;
+ }
- for (size_t i = 0; i < Aggs_.size(); ++i) {
- AggBuilders_[i]->Add(ptr);
- Aggs_[i]->DestroyState(ptr);
+ TInputBuffer in(GetKeyView<TKey>(key, KeyLength_));
+ for (auto& kb : Builders_) {
+ kb->Add(in);
+ }
- ptr += Aggs_[i]->StateSize;
+ if constexpr (Many) {
+ for (ui32 i = 0; i < Streams_.size(); ++i) {
+ MKQL_ENSURE(ptr[i], "Missing partial aggregation state for stream #" << i);
}
- }
- };
- for (; iter != hash.End(); hash.Advance(iter)) {
- if (!hash.IsValid(iter)) {
- continue;
+ ptr += Streams_.size();
}
- if (OutputBlockSize_ == MaxBlockLen_) {
- iterateBatch();
- return false;
- }
+ for (size_t i = 0; i < Aggs_.size(); ++i) {
+ AggBuilders_[i]->Add(ptr);
+ Aggs_[i]->DestroyState(ptr);
- if (itersLen == iters.size()) {
- iterateBatch();
- itersLen = 0;
+ ptr += Aggs_[i]->StateSize;
}
+ }
+ };
- iters[itersLen] = iter;
- ++itersLen;
- ++OutputBlockSize_;
- if constexpr (UseArena) {
- auto payload = (char*)hash.GetPayload(iter);
- auto ptr = *(char**)payload;
- NYql::PrefetchForWrite(ptr);
+ for (; iter != hash.End(); hash.Advance(iter)) {
+ if (!hash.IsValid(iter)) {
+ continue;
+ }
+
+ if (OutputBlockSize_ == MaxBlockLen_) {
+ iterateBatch();
+ return false;
+ }
+
+ if (itersLen == iters.size()) {
+ iterateBatch();
+ itersLen = 0;
+ }
+
+ iters[itersLen] = iter;
+ ++itersLen;
+ ++OutputBlockSize_;
+ if constexpr (UseArena) {
+ auto payload = (char*)hash.GetPayload(iter);
+ auto ptr = *(char**)payload;
+ NYql::PrefetchForWrite(ptr);
+ }
+
+ if constexpr (std::is_same<TKey, TSSOKey>::value) {
+ const auto& key = hash.GetKey(iter);
+ if (!key.IsInplace()) {
+ NYql::PrefetchForRead(key.AsView().Data());
}
+ } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) {
+ const auto& key = hash.GetKey(iter);
+ NYql::PrefetchForRead(key.Data);
+ }
+ }
- if constexpr (std::is_same<TKey, TSSOKey>::value) {
- const auto& key = hash.GetKey(iter);
- if (!key.IsInplace()) {
- NYql::PrefetchForRead(key.AsView().Data());
- }
- } else if constexpr (std::is_same<TKey, TExternalFixedSizeKey>::value) {
- const auto& key = hash.GetKey(iter);
- NYql::PrefetchForRead(key.Data);
+ iterateBatch();
+ return true;
+ }
+};
+
+template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived>
+class THashedWrapperBaseFromFlow : public TStatefulWideFlowCodegeneratorNode<TDerived>,
+ protected THashedWrapperCodegenBase
+{
+ using TComputationBase = TStatefulWideFlowCodegeneratorNode<TDerived>;
+
+ using TState = THashedWrapperBaseState<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>;
+public:
+ THashedWrapperBaseFromFlow(TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ size_t maxBlockLen,
+ ui32 keyLength,
+ std::vector<TAggParams<TAggregator>>&& aggsParams,
+ ui32 streamIndex,
+ std::vector<std::vector<ui32>>&& streams)
+ : TComputationBase(mutables, flow, EValueRepresentation::Boxed)
+ , Flow_(flow)
+ , FilterColumn_(filterColumn)
+ , Width_(width)
+ , OutputWidth_(keys.size() + aggsParams.size() + 1)
+ , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width))
+ , Keys_(keys)
+ , MaxBlockLen_(maxBlockLen)
+ , AggsParams_(std::move(aggsParams))
+ , KeyLength_(keyLength)
+ , StreamIndex_(streamIndex)
+ , Streams_(std::move(streams))
+ {
+ MKQL_ENSURE(Width_ > 0, "Missing block length column");
+ if constexpr (UseFilter) {
+ MKQL_ENSURE(filterColumn, "Missing filter column");
+ MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize");
+ } else {
+ MKQL_ENSURE(!filterColumn, "Unexpected filter column");
+ }
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
+ TComputationContext& ctx,
+ NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx);
+ if (!s.Count) {
+ if (s.IsFinished_)
+ return EFetchResult::Finish;
+
+ while (!s.WritingOutput_) {
+ const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
+ s.Values_.assign(s.Values_.size(), NUdf::TUnboxedValuePod());
+ switch (Flow_->FetchValues(ctx, fields)) {
+ case EFetchResult::Yield:
+ return EFetchResult::Yield;
+ case EFetchResult::One:
+ s.ProcessInput(ctx.HolderFactory);
+ continue;
+ case EFetchResult::Finish:
+ break;
}
+
+ if (s.Finish())
+ break;
+ else
+ return EFetchResult::Finish;
}
- iterateBatch();
- return true;
+ if (!s.FillOutput(ctx.HolderFactory))
+ return EFetchResult::Finish;
}
- };
+
+ const auto sliceSize = s.Slice();
+ for (size_t i = 0; i < OutputWidth_; ++i) {
+ if (const auto out = output[i]) {
+ *out = s.Get(sliceSize, ctx.HolderFactory, i);
+ }
+ }
+ return EFetchResult::One;
+ }
+#ifndef MKQL_DISABLE_CODEGEN
+ ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
+ return DoGenGetValuesImpl(ctx, statePtr, block, Flow_, Width_, OutputWidth_,
+ GetMethodPtr(&TState::Get), GetMethodPtr(&THashedWrapperBaseFromFlow::MakeState),
+ GetMethodPtr(&TState::ProcessInput), GetMethodPtr(&TState::Finish),
+ GetMethodPtr(&TState::FillOutput), GetMethodPtr(&TState::Slice));
+ }
+#endif
private:
void RegisterDependencies() const final {
this->FlowDependsOn(Flow_);
@@ -1665,11 +1772,136 @@ private:
const std::vector<std::vector<ui32>> Streams_;
};
+
+template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived>
+class THashedWrapperBaseFromStream : public TMutableComputationNode<TDerived>,
+ protected THashedWrapperCodegenBase
+{
+ using TComputationBase = TMutableComputationNode<TDerived>;
+
+ using TState = THashedWrapperBaseState<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>;
+public:
+ THashedWrapperBaseFromStream(TComputationMutables& mutables,
+ IComputationNode* stream,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ size_t maxBlockLen,
+ ui32 keyLength,
+ std::vector<TAggParams<TAggregator>>&& aggsParams,
+ ui32 streamIndex,
+ std::vector<std::vector<ui32>>&& streams)
+ : TComputationBase(mutables, EValueRepresentation::Boxed)
+ , Stream_(stream)
+ , FilterColumn_(filterColumn)
+ , Width_(width)
+ , OutputWidth_(keys.size() + aggsParams.size() + 1)
+ , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(width))
+ , Keys_(keys)
+ , MaxBlockLen_(maxBlockLen)
+ , AggsParams_(std::move(aggsParams))
+ , KeyLength_(keyLength)
+ , StreamIndex_(streamIndex)
+ , Streams_(std::move(streams))
+ {
+ MKQL_ENSURE(Width_ > 0, "Missing block length column");
+ if constexpr (UseFilter) {
+ MKQL_ENSURE(filterColumn, "Missing filter column");
+ MKQL_ENSURE(!Finalize, "Filter isn't compatible with Finalize");
+ } else {
+ MKQL_ENSURE(!filterColumn, "Unexpected filter column");
+ }
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+ {
+ const auto state = ctx.HolderFactory.Create<TState>(KeyLength_, StreamIndex_, Width_, OutputWidth_, FilterColumn_, AggsParams_, Streams_, Keys_, MaxBlockLen_, ctx);
+ return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(state), std::move(Stream_->GetValue(ctx)));
+ }
+private:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ using TBase = TComputationValue<TStreamValue>;
+ public:
+ TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
+ NUdf::TUnboxedValue&& state, NUdf::TUnboxedValue&& stream)
+ : TBase(memInfo)
+ , State_(state)
+ , Stream_(stream)
+ , HolderFactory_(holderFactory)
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ TState& state = *static_cast<TState*>(State_.AsBoxed().Get());
+ auto* inputFields = state.Values_.data();
+ const size_t inputWidth = state.Width_;
+ const size_t outputWidth = state.OutputWidth_;
+ MKQL_ENSURE(outputWidth == width, "The given width doesn't equal to the result type size");
+
+ if (!state.Count) {
+ if (state.IsFinished_)
+ return NUdf::EFetchStatus::Finish;
+
+ while (!state.WritingOutput_) {
+ switch (Stream_.WideFetch(inputFields, inputWidth)) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Ok:
+ state.ProcessInput(HolderFactory_);
+ continue;
+ case NUdf::EFetchStatus::Finish:
+ break;
+ }
+
+ if (state.Finish())
+ break;
+ else
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ if (!state.FillOutput(HolderFactory_))
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ const auto sliceSize = state.Slice();
+ for (size_t i = 0; i < outputWidth; ++i) {
+ output[i] = state.Get(sliceSize, HolderFactory_, i);
+ }
+ return NUdf::EFetchStatus::Ok;
+ }
+ private:
+ NUdf::TUnboxedValue State_;
+ NUdf::TUnboxedValue Stream_;
+ const THolderFactory& HolderFactory_;
+ };
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Stream_);
+ }
+
+ IComputationNode *const Stream_;
+ const std::optional<ui32> FilterColumn_;
+ const size_t Width_;
+ const size_t OutputWidth_;
+ const size_t WideFieldsIndex_;
+ const std::vector<TKeyParams> Keys_;
+ const size_t MaxBlockLen_;
+ const std::vector<TAggParams<TAggregator>> AggsParams_;
+ const ui32 KeyLength_;
+ const ui32 StreamIndex_;
+ const std::vector<std::vector<ui32>> Streams_;
+};
+
+template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter, typename TInputNode>
+class TBlockCombineHashedWrapper {};
+
template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter>
-class TBlockCombineHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter>> {
+class TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode>
+ : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode>> {
public:
- using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter>;
- using TBase = THashedWrapperBase<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>;
+ using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationWideFlowNode>;
+ using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>;
TBlockCombineHashedWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1683,11 +1915,34 @@ public:
{}
};
+template <typename TKey, typename TFixedAggState, bool UseSet, bool UseFilter>
+class TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode>
+ : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode>> {
+public:
+ using TSelf = TBlockCombineHashedWrapper<TKey, TFixedAggState, UseSet, UseFilter, IComputationNode>;
+ using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorCombineKeys, TFixedAggState, UseSet, UseFilter, false, false, TSelf>;
+
+ TBlockCombineHashedWrapper(TComputationMutables& mutables,
+ IComputationNode* stream,
+ std::optional<ui32> filterColumn,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ size_t maxBlockLen,
+ ui32 keyLength,
+ std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams)
+ : TBase(mutables, stream, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams), 0, {})
+ {}
+};
+
+template <typename TKey, typename TFixedAggState, bool UseSet, typename TInputNode>
+class TBlockMergeFinalizeHashedWrapper {};
+
template <typename TKey, typename TFixedAggState, bool UseSet>
-class TBlockMergeFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet>> {
+class TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode>
+ : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode>> {
public:
- using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet>;
- using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>;
+ using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationWideFlowNode>;
+ using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>;
TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1700,11 +1955,33 @@ public:
{}
};
+template <typename TKey, typename TFixedAggState, bool UseSet>
+class TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode>
+ : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode>> {
+public:
+ using TSelf = TBlockMergeFinalizeHashedWrapper<TKey, TFixedAggState, UseSet, IComputationNode>;
+ using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, UseSet, false, true, false, TSelf>;
+
+ TBlockMergeFinalizeHashedWrapper(TComputationMutables& mutables,
+ IComputationNode* stream,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ size_t maxBlockLen,
+ ui32 keyLength,
+ std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams)
+ : TBase(mutables, stream, {}, width, keys, maxBlockLen, keyLength, std::move(aggsParams), 0, {})
+ {}
+};
+
+template <typename TKey, typename TFixedAggState, typename TInputNode>
+class TBlockMergeManyFinalizeHashedWrapper {};
+
template <typename TKey, typename TFixedAggState>
-class TBlockMergeManyFinalizeHashedWrapper : public THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState>> {
+class TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode>
+ : public THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode>> {
public:
- using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState>;
- using TBase = THashedWrapperBase<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>;
+ using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationWideFlowNode>;
+ using TBase = THashedWrapperBaseFromFlow<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>;
TBlockMergeManyFinalizeHashedWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
@@ -1718,6 +1995,25 @@ public:
{}
};
+template <typename TKey, typename TFixedAggState>
+class TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode>
+ : public THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode>> {
+public:
+ using TSelf = TBlockMergeManyFinalizeHashedWrapper<TKey, TFixedAggState, IComputationNode>;
+ using TBase = THashedWrapperBaseFromStream<TKey, IBlockAggregatorFinalizeKeys, TFixedAggState, false, false, true, true, TSelf>;
+
+ TBlockMergeManyFinalizeHashedWrapper(TComputationMutables& mutables,
+ IComputationNode* stream,
+ size_t width,
+ const std::vector<TKeyParams>& keys,
+ size_t maxBlockLen,
+ ui32 keyLength,
+ std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams,
+ ui32 streamIndex, std::vector<std::vector<ui32>>&& streams)
+ : TBase(mutables, stream, {}, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams))
+ {}
+};
+
template <typename TAggregator>
std::unique_ptr<IPreparedBlockAggregator<TAggregator>> PrepareBlockAggregator(const IBlockAggregatorFactory& factory,
TTupleType* tupleType,
@@ -1824,117 +2120,117 @@ ui32 FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, std::optional<
return totalStateSize;
}
-template <bool UseSet, bool UseFilter, typename TKey>
+template <bool UseSet, bool UseFilter, typename TKey, typename TInputNode>
IComputationNode* MakeBlockCombineHashedWrapper(
ui32 keyLength,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
std::optional<ui32> filterColumn,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) {
if (totalStateSize <= sizeof(TState8)) {
- return new TBlockCombineHashedWrapper<TKey, TState8, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<TKey, TState8, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
if (totalStateSize <= sizeof(TState16)) {
- return new TBlockCombineHashedWrapper<TKey, TState16, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<TKey, TState16, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
- return new TBlockCombineHashedWrapper<TKey, TStateArena, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<TKey, TStateArena, UseSet, UseFilter, TInputNode>(mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
-template <bool UseSet, bool UseFilter>
+template <bool UseSet, bool UseFilter, typename TInputNode>
IComputationNode* MakeBlockCombineHashedWrapper(
TMaybe<ui32> totalKeysSize,
bool isFixed,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
std::optional<ui32> filterColumn,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
std::vector<TAggParams<IBlockAggregatorCombineKeys>>&& aggsParams) {
if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
- return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui32>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
- return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockCombineHashedWrapper<UseSet, UseFilter, ui64>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) {
- return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TKey16>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TKey16>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && isFixed) {
- return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
}
- return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TSSOKey>(Max<ui32>(), totalStateSize, mutables, flow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockCombineHashedWrapper<UseSet, UseFilter, TSSOKey>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, filterColumn, width, keys, maxBlockLen, std::move(aggsParams));
}
-template <typename TKey, bool UseSet>
+template <typename TKey, bool UseSet, typename TInputNode>
IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
ui32 keyLength,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) {
if (totalStateSize <= sizeof(TState8)) {
- return new TBlockMergeFinalizeHashedWrapper<TKey, TState8, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockMergeFinalizeHashedWrapper<TKey, TState8, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
if (totalStateSize <= sizeof(TState16)) {
- return new TBlockMergeFinalizeHashedWrapper<TKey, TState16, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockMergeFinalizeHashedWrapper<TKey, TState16, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
- return new TBlockMergeFinalizeHashedWrapper<TKey, TStateArena, UseSet>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
+ return new TBlockMergeFinalizeHashedWrapper<TKey, TStateArena, UseSet, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams));
}
-template <bool UseSet>
+template <bool UseSet, typename TInputNode>
IComputationNode* MakeBlockMergeFinalizeHashedWrapper(
TMaybe<ui32> totalKeysSize,
bool isFixed,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
std::vector<TAggParams<IBlockAggregatorFinalizeKeys>>&& aggsParams) {
if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
- return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockMergeFinalizeHashedWrapper<ui32, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
- return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockMergeFinalizeHashedWrapper<ui64, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) {
- return MakeBlockMergeFinalizeHashedWrapper<TKey16, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockMergeFinalizeHashedWrapper<TKey16, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams));
}
if (totalKeysSize && isFixed) {
- return MakeBlockMergeFinalizeHashedWrapper<TExternalFixedSizeKey, UseSet>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockMergeFinalizeHashedWrapper<TExternalFixedSizeKey, UseSet>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams));
}
- return MakeBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(Max<ui32>(), totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams));
+ return MakeBlockMergeFinalizeHashedWrapper<TSSOKey, UseSet>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams));
}
-template <typename TKey>
+template <typename TKey, typename TInputNode>
IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
ui32 keyLength,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
@@ -1943,22 +2239,23 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
std::vector<std::vector<ui32>>&& streams) {
if (totalStateSize <= sizeof(TState8)) {
- return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState8>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
+ return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState8, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
}
if (totalStateSize <= sizeof(TState16)) {
- return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState16>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
+ return new TBlockMergeManyFinalizeHashedWrapper<TKey, TState16, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
}
- return new TBlockMergeManyFinalizeHashedWrapper<TKey, TStateArena>(mutables, flow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
+ return new TBlockMergeManyFinalizeHashedWrapper<TKey, TStateArena, TInputNode>(mutables, streamOrFlow, width, keys, maxBlockLen, keyLength, std::move(aggsParams), streamIndex, std::move(streams));
}
+template <typename TInputNode>
IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
TMaybe<ui32> totalKeysSize,
bool isFixed,
ui32 totalStateSize,
TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
+ TInputNode* streamOrFlow,
size_t width,
const std::vector<TKeyParams>& keys,
size_t maxBlockLen,
@@ -1966,22 +2263,22 @@ IComputationNode* MakeBlockMergeManyFinalizeHashedWrapper(
ui32 streamIndex,
std::vector<std::vector<ui32>>&& streams) {
if (totalKeysSize && *totalKeysSize <= sizeof(ui32)) {
- return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ return MakeBlockMergeManyFinalizeHashedWrapper<ui32>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(ui64)) {
- return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ return MakeBlockMergeManyFinalizeHashedWrapper<ui64>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
}
if (totalKeysSize && *totalKeysSize <= sizeof(TKey16)) {
- return MakeBlockMergeManyFinalizeHashedWrapper<TKey16>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ return MakeBlockMergeManyFinalizeHashedWrapper<TKey16>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
}
if (totalKeysSize && isFixed) {
- return MakeBlockMergeManyFinalizeHashedWrapper<TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ return MakeBlockMergeManyFinalizeHashedWrapper<TExternalFixedSizeKey>(*totalKeysSize, totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
}
- return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(Max<ui32>(), totalStateSize, mutables, flow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ return MakeBlockMergeManyFinalizeHashedWrapper<TSSOKey>(Max<ui32>(), totalStateSize, mutables, streamOrFlow, width, keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
}
void PrepareKeys(const std::vector<TKeyParams>& keys, TMaybe<ui32>& totalKeysSize, bool& isFixed) {
@@ -2012,14 +2309,15 @@ void FillAggStreams(TRuntimeNode streamsNode, std::vector<std::vector<ui32>>& st
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto wideComponents = GetWideComponents(flowType);
+
+ const bool isStream = callable.GetInput(0).GetStaticType()->IsStream();
+ MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream");
+
+ const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType());
const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
- const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType());
- const auto returnWideComponents = GetWideComponents(returnFlowType);
+ const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType());
- auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0);
auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
std::optional<ui32> filterColumn;
@@ -2030,19 +2328,28 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod
auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(2));
std::vector<TAggParams<IBlockAggregatorCombineAll>> aggsParams;
FillAggParams<IBlockAggregatorCombineAll>(aggsVal, tupleType, filterColumn, aggsParams, ctx.Env, false, false, returnWideComponents, 0);
- return new TBlockCombineAllWrapper(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+
+ if (isStream) {
+ const auto wideStream = wideFlowOrStream;
+ return new TBlockCombineAllWrapperFromStream(ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+ } else {
+ const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream);
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ return new TBlockCombineAllWrapperFromFlow(ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), std::move(aggsParams));
+ }
}
IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto wideComponents = GetWideComponents(flowType);
+
+ const bool isStream = callable.GetInput(0).GetStaticType()->IsStream();
+ MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream");
+
+ const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType());
const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
- const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType());
- const auto returnWideComponents = GetWideComponents(returnFlowType);
+ const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType());
- auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0);
auto filterColumnVal = AS_VALUE(TOptionalLiteral, callable.GetInput(1));
std::optional<ui32> filterColumn;
@@ -2066,31 +2373,51 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
PrepareKeys(keys, totalKeysSize, isFixed);
const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType());
- if (filterColumn) {
- if (aggsParams.empty()) {
- return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ if (isStream) {
+ const auto wideStream = wideStreamOrFlow;
+ if (filterColumn) {
+ if (aggsParams.empty()) {
+ return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
} else {
- return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ if (aggsParams.empty()) {
+ return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
}
} else {
- if (aggsParams.empty()) {
- return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow);
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ if (filterColumn) {
+ if (aggsParams.empty()) {
+ return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
} else {
- return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ if (aggsParams.empty()) {
+ return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
}
}
}
IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto wideComponents = GetWideComponents(flowType);
+
+ const bool isStream = callable.GetInput(0).GetStaticType()->IsStream();
+ MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream");
+
+ const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType());
const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
- const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType());
- const auto returnWideComponents = GetWideComponents(returnFlowType);
+ const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType());
- auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0);
auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1));
std::vector<TKeyParams> keys;
@@ -2108,23 +2435,35 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu
PrepareKeys(keys, totalKeysSize, isFixed);
const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType());
- if (aggsParams.empty()) {
- return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ if (isStream) {
+ const auto wideStream = wideStreamOrFlow;
+ if (aggsParams.empty()) {
+ return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
} else {
- return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow);
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ if (aggsParams.empty()) {
+ return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockMergeFinalizeHashedWrapper<false>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(), keys, maxBlockLen, std::move(aggsParams));
+ }
}
}
IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto wideComponents = GetWideComponents(flowType);
+
+ const bool isStream = callable.GetInput(0).GetStaticType()->IsStream();
+ MKQL_ENSURE(isStream == callable.GetType()->GetReturnType()->IsStream(), "input and output must be both either flow or stream");
+
+ const auto wideComponents = GetWideComponents(callable.GetInput(0).GetStaticType());
const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
- const auto returnFlowType = AS_TYPE(TFlowType, callable.GetType()->GetReturnType());
- const auto returnWideComponents = GetWideComponents(returnFlowType);
+ const auto returnWideComponents = GetWideComponents(callable.GetType()->GetReturnType());
- const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ const auto wideStreamOrFlow = LocateNode(ctx.NodeLocator, callable, 0);
auto keysVal = AS_VALUE(TTupleLiteral, callable.GetInput(1));
std::vector<TKeyParams> keys;
@@ -2147,12 +2486,25 @@ IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TC
totalStateSize += streams.size();
const size_t maxBlockLen = CalcMaxBlockLenForOutput(callable.GetType()->GetReturnType());
- if (aggsParams.empty()) {
- return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(),
- keys, maxBlockLen, std::move(aggsParams));
+ if (isStream){
+ const auto wideStream = wideStreamOrFlow;
+ if (aggsParams.empty()) {
+ return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(),
+ keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideStream, tupleType->GetElementsCount(),
+ keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ }
} else {
- return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(),
- keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ const auto wideFlow = dynamic_cast<IComputationWideFlowNode *>(wideStreamOrFlow);
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+ if (aggsParams.empty()) {
+ return MakeBlockMergeFinalizeHashedWrapper<true>(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(),
+ keys, maxBlockLen, std::move(aggsParams));
+ } else {
+ return MakeBlockMergeManyFinalizeHashedWrapper(totalKeysSize, isFixed, totalStateSize, ctx.Mutables, wideFlow, tupleType->GetElementsCount(),
+ keys, maxBlockLen, std::move(aggsParams), streamIndex, std::move(streams));
+ }
}
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 3206c5e47a..00a7f18463 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5730,14 +5730,15 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn,
- const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
- if constexpr (RuntimeVersion < 31U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
+TRuntimeNode TProgramBuilder::BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
@@ -5759,14 +5760,32 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<u
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode stream, std::optional<ui32> filterColumn,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockCombineAll(__func__, ToFlow(stream), filterColumn, aggs, flowReturnType));
+ } else {
+ return BuildBlockCombineAll(__func__, stream, filterColumn, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
+
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
@@ -5794,14 +5813,31 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
+TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockCombineHashed(__func__, ToFlow(stream), filterColumn, keys, aggs, flowReturnType));
+ } else {
+ return BuildBlockCombineHashed(__func__, stream, filterColumn, keys, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
TVector<TRuntimeNode> keyNodes;
for (const auto& key : keys) {
@@ -5824,14 +5860,31 @@ TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
- const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockMergeFinalizeHashed(__func__, ToFlow(stream), keys, aggs, flowReturnType));
+ } else {
+ return BuildBlockMergeFinalizeHashed(__func__, stream, keys, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
TVector<TRuntimeNode> keyNodes;
for (const auto& key : keys) {
@@ -5866,6 +5919,23 @@ TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, co
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+ if constexpr (RuntimeVersion < 31U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockMergeManyFinalizeHashed(__func__, ToFlow(stream), keys, aggs, streamIndex, streams, flowReturnType));
+ } else {
+ return BuildBlockMergeManyFinalizeHashed(__func__, stream, keys, aggs, streamIndex, streams, returnType);
+ }
+}
+
TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler) {
if constexpr (RuntimeVersion < 39U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 58fdc4c272..74cf9a80ea 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -759,6 +759,15 @@ protected:
private:
TRuntimeNode BuildWideFilter(const std::string_view& callableName, TRuntimeNode flow, const TNarrowLambda& handler);
+ TRuntimeNode BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType);
+ TRuntimeNode BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType);
+ TRuntimeNode BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType);
+ TRuntimeNode BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType);
+
TRuntimeNode DictItems(TRuntimeNode dict, EDictItems mode);
TRuntimeNode If(TRuntimeNode condition, TRuntimeNode thenBranch, TRuntimeNode elseBranch, TType* resultType);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index 4d416ac708..bfd26216ab 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 51U
+#define MKQL_RUNTIME_VERSION 52U
#endif
// History:
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json
index 895fd69dc1..2a936aacb1 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json
@@ -486,9 +486,9 @@
],
"test.test[blocks-combine_all_pg_filter--Peephole]": [
{
- "checksum": "b82fb484ea512752ac0bd055f2881bad",
- "size": 5463,
- "uri": "https://{canondata_backend}/1689644/c571b164c0329219eb668c8d9e33807153d99a05/resource.tar.gz#test.test_blocks-combine_all_pg_filter--Peephole_/opt.yql"
+ "checksum": "f9644e5d1aa2916e452370ff93594ba1",
+ "size": 5584,
+ "uri": "https://{canondata_backend}/1942100/eca836514e0d22543696dadda9c6b5fd0411ec2c/resource.tar.gz#test.test_blocks-combine_all_pg_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_pg_filter--Plan]": [
@@ -598,9 +598,9 @@
],
"test.test[blocks-minmax_strings--Peephole]": [
{
- "checksum": "e24251cf43d6e021c673a59a5d975701",
- "size": 7778,
- "uri": "https://{canondata_backend}/1775319/332ffa137abc3e824ae9020e8b37875c568e01e9/resource.tar.gz#test.test_blocks-minmax_strings--Peephole_/opt.yql"
+ "checksum": "585085c48335fca5229be389e233cf6b",
+ "size": 8001,
+ "uri": "https://{canondata_backend}/1942100/eca836514e0d22543696dadda9c6b5fd0411ec2c/resource.tar.gz#test.test_blocks-minmax_strings--Peephole_/opt.yql"
}
],
"test.test[blocks-minmax_strings--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json
index a91f6c216a..abc4bfb0b5 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part1/canondata/result.json
@@ -643,9 +643,9 @@
],
"test.test[blocks-combine_hashed_min--Peephole]": [
{
- "checksum": "6a4ee50207cc13b68ec0b901b5c90bbc",
- "size": 3482,
- "uri": "https://{canondata_backend}/1889210/5d1223c56711b5ccfa2ce8980fabbe18f3a04d58/resource.tar.gz#test.test_blocks-combine_hashed_min--Peephole_/opt.yql"
+ "checksum": "fe48deb06089372a1be7b0f21a39fafa",
+ "size": 3530,
+ "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_min--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_min--Plan]": [
@@ -671,9 +671,9 @@
],
"test.test[blocks-combine_hashed_minmax_double--Peephole]": [
{
- "checksum": "4eed869d74a2e6927965254eaabfaef9",
- "size": 2858,
- "uri": "https://{canondata_backend}/1889210/5d1223c56711b5ccfa2ce8980fabbe18f3a04d58/resource.tar.gz#test.test_blocks-combine_hashed_minmax_double--Peephole_/opt.yql"
+ "checksum": "ed48b0c0f01f1031a11943749017abb4",
+ "size": 2862,
+ "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_minmax_double--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_minmax_double--Plan]": [
@@ -699,9 +699,9 @@
],
"test.test[blocks-combine_hashed_set--Peephole]": [
{
- "checksum": "c8527dced2050b347b8abb2b106e392e",
- "size": 2248,
- "uri": "https://{canondata_backend}/1817427/bbd43b1ed3e4b4e4804e9367a38fc8672c2b46db/resource.tar.gz#test.test_blocks-combine_hashed_set--Peephole_/opt.yql"
+ "checksum": "65b13975fe3b35dcf2a50f4c796a12c9",
+ "size": 2268,
+ "uri": "https://{canondata_backend}/1942100/460fd0564dd37c75913eb7e3e5f96a12d70b483a/resource.tar.gz#test.test_blocks-combine_hashed_set--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_set--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json
index 6523279b72..b22d08a839 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part10/canondata/result.json
@@ -460,9 +460,9 @@
],
"test.test[blocks-combine_all_avg_filter--Peephole]": [
{
- "checksum": "67e87d544bf1db7426409705ac43f5d2",
- "size": 2221,
- "uri": "https://{canondata_backend}/1689644/e953ec7ef3d2122dd1da61bd4dc7f631010ae6c6/resource.tar.gz#test.test_blocks-combine_all_avg_filter--Peephole_/opt.yql"
+ "checksum": "839af3ed70bf69f8c9ac1d08a622b053",
+ "size": 2264,
+ "uri": "https://{canondata_backend}/1936273/60c8060084c7fca13bcbdeeafd7bb7009b935816/resource.tar.gz#test.test_blocks-combine_all_avg_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_avg_filter--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
index 934286b6c7..eded04e75f 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part11/canondata/result.json
@@ -417,9 +417,9 @@
],
"test.test[blocks-combine_all_decimal--Peephole]": [
{
- "checksum": "f4f26b88e3c27f7c7b7e9a1dbdd2297d",
- "size": 5371,
- "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_decimal--Peephole_/opt.yql"
+ "checksum": "f1af6f4b597cfc3f02016363538384b6",
+ "size": 5391,
+ "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_decimal--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_decimal--Plan]": [
@@ -445,9 +445,9 @@
],
"test.test[blocks-combine_all_some_filter--Peephole]": [
{
- "checksum": "04b9f3cbee17a2e2af320d0764578a2e",
- "size": 2020,
- "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_some_filter--Peephole_/opt.yql"
+ "checksum": "12353b122e8698f5979ab7ac48edbf28",
+ "size": 2063,
+ "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_some_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_some_filter--Plan]": [
@@ -473,9 +473,9 @@
],
"test.test[blocks-combine_all_sum--Peephole]": [
{
- "checksum": "2c97860ba346328f8a92d745c7ca45fa",
- "size": 4165,
- "uri": "https://{canondata_backend}/1689644/6b51d5d198e5b6a200e05073bbbd27568362da19/resource.tar.gz#test.test_blocks-combine_all_sum--Peephole_/opt.yql"
+ "checksum": "1332006a572a466ce7cc65cb9f79d3d4",
+ "size": 4185,
+ "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-combine_all_sum--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_sum--Plan]": [
@@ -501,9 +501,9 @@
],
"test.test[blocks-distinct_opt_state_keys--Peephole]": [
{
- "checksum": "19872b5003fd570975efecbb34cb27a7",
- "size": 8532,
- "uri": "https://{canondata_backend}/1900335/48c1045110f93be7271adb6caf109f9f9e32f3bc/resource.tar.gz#test.test_blocks-distinct_opt_state_keys--Peephole_/opt.yql"
+ "checksum": "2019c89c68ce2f33c11bafe6a65b4f43",
+ "size": 8686,
+ "uri": "https://{canondata_backend}/1937001/d7dd873e29f16ef285cde87454b8789a4270b9bf/resource.tar.gz#test.test_blocks-distinct_opt_state_keys--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_opt_state_keys--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json
index c121ee495e..7db6be5a4f 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part12/canondata/result.json
@@ -472,9 +472,9 @@
],
"test.test[blocks-combine_all_max--Peephole]": [
{
- "checksum": "311b380d0cf9a03f17b5be896eb75d52",
- "size": 6265,
- "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-combine_all_max--Peephole_/opt.yql"
+ "checksum": "578d8413e4f0938c5c51ddbd50c7ffde",
+ "size": 6267,
+ "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-combine_all_max--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_max--Plan]": [
@@ -500,9 +500,9 @@
],
"test.test[blocks-combine_all_minmax_nested--Peephole]": [
{
- "checksum": "a0294d39683e7ad6fa80cfeb34bbf0a0",
- "size": 3407,
- "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-combine_all_minmax_nested--Peephole_/opt.yql"
+ "checksum": "1d3e6826662dae4130959c4b1e4d197c",
+ "size": 3427,
+ "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-combine_all_minmax_nested--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_minmax_nested--Plan]": [
@@ -528,9 +528,9 @@
],
"test.test[blocks-combine_hashed_sum--Peephole]": [
{
- "checksum": "b9e75e3fb2350f09dbbcd3a17fa55617",
- "size": 3542,
- "uri": "https://{canondata_backend}/1942671/7651701708982db1bc7660dcd6a1d063fc197122/resource.tar.gz#test.test_blocks-combine_hashed_sum--Peephole_/opt.yql"
+ "checksum": "ece135ac007bb7b19234bbf85e0aee45",
+ "size": 3590,
+ "uri": "https://{canondata_backend}/1920236/0b8929411f995ee81d269f8519a2b62822b5b0f7/resource.tar.gz#test.test_blocks-combine_hashed_sum--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_sum--Plan]": [
@@ -724,9 +724,9 @@
],
"test.test[blocks-lazy_nonstrict_with_scalar_ctx--Peephole]": [
{
- "checksum": "5f260ad4f10c1e79d9bec7361e08d388",
- "size": 3760,
- "uri": "https://{canondata_backend}/1936273/2e12f403b84f544db0dddee878135eb165520afe/resource.tar.gz#test.test_blocks-lazy_nonstrict_with_scalar_ctx--Peephole_/opt.yql"
+ "checksum": "526263536048e247185473b3e7e22a71",
+ "size": 3708,
+ "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-lazy_nonstrict_with_scalar_ctx--Peephole_/opt.yql"
}
],
"test.test[blocks-lazy_nonstrict_with_scalar_ctx--Plan]": [
@@ -752,9 +752,9 @@
],
"test.test[blocks-minmax_strings_filter--Peephole]": [
{
- "checksum": "96ad208ff8de572b236d23a250c26c0e",
- "size": 3925,
- "uri": "https://{canondata_backend}/1942671/7651701708982db1bc7660dcd6a1d063fc197122/resource.tar.gz#test.test_blocks-minmax_strings_filter--Peephole_/opt.yql"
+ "checksum": "f0048f669753144b52457204bff26d18",
+ "size": 4056,
+ "uri": "https://{canondata_backend}/1942100/1b0eecc460579a62c09a8328b78b1c3d0737be71/resource.tar.gz#test.test_blocks-minmax_strings_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-minmax_strings_filter--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json
index 2ab2c85970..85d9a8dcb8 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part13/canondata/result.json
@@ -484,9 +484,9 @@
],
"test.test[blocks-combine_hashed_pg--Peephole]": [
{
- "checksum": "44f4557f8602e6367b7cff827ac89234",
- "size": 6379,
- "uri": "https://{canondata_backend}/1880306/c85931dc2ddfc9372ac5287e031731a861495fb0/resource.tar.gz#test.test_blocks-combine_hashed_pg--Peephole_/opt.yql"
+ "checksum": "bdb9351eb3f869d1e2e3bd47329de921",
+ "size": 6507,
+ "uri": "https://{canondata_backend}/1775319/1d2e8f1b903194b7f97d4c3a5928f81379354efa/resource.tar.gz#test.test_blocks-combine_hashed_pg--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_pg--Plan]": [
@@ -512,9 +512,9 @@
],
"test.test[blocks-distinct_opt_state_all--Peephole]": [
{
- "checksum": "615baf31dbbe6dd549fee114e9f17fe5",
- "size": 8713,
- "uri": "https://{canondata_backend}/995452/76963fbe19e9e3c21b622412a3ae8fe5473f46ef/resource.tar.gz#test.test_blocks-distinct_opt_state_all--Peephole_/opt.yql"
+ "checksum": "1333bede710777316d4570510fd4ea7f",
+ "size": 8841,
+ "uri": "https://{canondata_backend}/1936273/1342bc207252f0d51d2a5d59cf94c96ae9330665/resource.tar.gz#test.test_blocks-distinct_opt_state_all--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_opt_state_all--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json
index 9a45ed2ced..943e2a3074 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part14/canondata/result.json
@@ -574,9 +574,9 @@
],
"test.test[blocks-combine_all_max_filter_opt--Peephole]": [
{
- "checksum": "ce72dc5aaf5dbc78a6afb13ac9ae87da",
- "size": 2119,
- "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-combine_all_max_filter_opt--Peephole_/opt.yql"
+ "checksum": "eba47c259d4e240f3d15ddd779edc6e5",
+ "size": 2162,
+ "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-combine_all_max_filter_opt--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_max_filter_opt--Plan]": [
@@ -602,9 +602,9 @@
],
"test.test[blocks-combine_all_min--Peephole]": [
{
- "checksum": "57cce58a2b91b32ee543632601fbc42d",
- "size": 6265,
- "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-combine_all_min--Peephole_/opt.yql"
+ "checksum": "d2ccd2cda420b126acf214dd58008924",
+ "size": 6267,
+ "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-combine_all_min--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_min--Plan]": [
@@ -630,9 +630,9 @@
],
"test.test[blocks-combine_hashed_count--Peephole]": [
{
- "checksum": "97ac986b17396f7162260355d37e2f2b",
- "size": 3630,
- "uri": "https://{canondata_backend}/1775319/dffc024d6db08a14065eefff7616512e9ec14ce3/resource.tar.gz#test.test_blocks-combine_hashed_count--Peephole_/opt.yql"
+ "checksum": "5fd4f1661e889ec3e4f8cce021d028e3",
+ "size": 3678,
+ "uri": "https://{canondata_backend}/1881367/4c7aa0ee09ad13a7c4c8522afaebb0a34e8a70bf/resource.tar.gz#test.test_blocks-combine_hashed_count--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_count--Plan]": [
@@ -770,9 +770,9 @@
],
"test.test[blocks-minmax_tuple--Peephole]": [
{
- "checksum": "88907c5a0dc7d235be650136e083477c",
- "size": 5500,
- "uri": "https://{canondata_backend}/1936273/9a6c178c3ec8bbc5f8d9a423f1c2a4b808cc630a/resource.tar.gz#test.test_blocks-minmax_tuple--Peephole_/opt.yql"
+ "checksum": "34fdb2237dc2a52901a3279048cb493d",
+ "size": 5540,
+ "uri": "https://{canondata_backend}/1936273/780afd5c76cefb9a0deca536730d6e710dfd47af/resource.tar.gz#test.test_blocks-minmax_tuple--Peephole_/opt.yql"
}
],
"test.test[blocks-minmax_tuple--Plan]": [
@@ -854,9 +854,9 @@
],
"test.test[blocks-struct_type--Peephole]": [
{
- "checksum": "1e4eb1415a14aaa58a5201e1dff2698b",
- "size": 3095,
- "uri": "https://{canondata_backend}/1775319/dffc024d6db08a14065eefff7616512e9ec14ce3/resource.tar.gz#test.test_blocks-struct_type--Peephole_/opt.yql"
+ "checksum": "f89c5ad6de169db5bceb2280b2b45c19",
+ "size": 3099,
+ "uri": "https://{canondata_backend}/1881367/4c7aa0ee09ad13a7c4c8522afaebb0a34e8a70bf/resource.tar.gz#test.test_blocks-struct_type--Peephole_/opt.yql"
}
],
"test.test[blocks-struct_type--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json
index 4179ae4702..e87e32930d 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part16/canondata/result.json
@@ -495,9 +495,9 @@
],
"test.test[blocks-combine_all_pg--Peephole]": [
{
- "checksum": "51b056d9dadc4c2d59d4d5e57572682a",
- "size": 7688,
- "uri": "https://{canondata_backend}/1689644/b05960229650fc70467add9c3d7638a8f3cdf3e1/resource.tar.gz#test.test_blocks-combine_all_pg--Peephole_/opt.yql"
+ "checksum": "4ddca44c25daaec6456186fbb157f488",
+ "size": 7776,
+ "uri": "https://{canondata_backend}/1916746/e62dc8205f86272010640f8fa6ec2e825d624df1/resource.tar.gz#test.test_blocks-combine_all_pg--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_pg--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json
index 2ac5013ae3..fdabf4e5f3 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part17/canondata/result.json
@@ -543,9 +543,9 @@
],
"test.test[blocks-combine_all_count_filter_opt--Peephole]": [
{
- "checksum": "d05f14700984d99846058bdf6739bad9",
- "size": 2233,
- "uri": "https://{canondata_backend}/1903885/ffc476321aa22def0fffeeb1a5d7ff900dfce9d6/resource.tar.gz#test.test_blocks-combine_all_count_filter_opt--Peephole_/opt.yql"
+ "checksum": "623683acd53d94b06b7a19bdb11b4ecf",
+ "size": 2276,
+ "uri": "https://{canondata_backend}/1937001/3b94c4bb2028a21752a5dee6baf1a850ce51d480/resource.tar.gz#test.test_blocks-combine_all_count_filter_opt--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_count_filter_opt--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
index 519cb58bbc..52a25855b1 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part18/canondata/result.json
@@ -456,9 +456,9 @@
],
"test.test[blocks-combine_all_min_filter--Peephole]": [
{
- "checksum": "1c63e924b4b71fb5db01c98dc02d38d8",
- "size": 2052,
- "uri": "https://{canondata_backend}/1903885/631311924f7bd550e8916c87c0bf4ae694df8ae6/resource.tar.gz#test.test_blocks-combine_all_min_filter--Peephole_/opt.yql"
+ "checksum": "4a8fc10717081d3a01a44dbc27597c1f",
+ "size": 2095,
+ "uri": "https://{canondata_backend}/1775319/8b72d711520d5cd84e67e6541ace66cd7067ac53/resource.tar.gz#test.test_blocks-combine_all_min_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_min_filter--Plan]": [
@@ -512,9 +512,9 @@
],
"test.test[blocks-date_group_by--Peephole]": [
{
- "checksum": "54e328a37fe9c9cc95f84acce454d9af",
- "size": 4667,
- "uri": "https://{canondata_backend}/1942100/977b836f807d2b39508e2ee0606842a99261a1da/resource.tar.gz#test.test_blocks-date_group_by--Peephole_/opt.yql"
+ "checksum": "c44c3b19c19cb7bec1fc9a11abdf89b8",
+ "size": 4673,
+ "uri": "https://{canondata_backend}/1775319/8b72d711520d5cd84e67e6541ace66cd7067ac53/resource.tar.gz#test.test_blocks-date_group_by--Peephole_/opt.yql"
}
],
"test.test[blocks-date_group_by--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json
index aecc50c866..4358fdb451 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json
@@ -584,9 +584,9 @@
],
"test.test[blocks-combine_hashed_max--Peephole]": [
{
- "checksum": "072de32f237c0a659fcb217825d6fe8d",
- "size": 3482,
- "uri": "https://{canondata_backend}/1942173/d7b51a8943054cfea98b1e42450768694da5000b/resource.tar.gz#test.test_blocks-combine_hashed_max--Peephole_/opt.yql"
+ "checksum": "1c4cc1601f1f61b764e391b851324999",
+ "size": 3530,
+ "uri": "https://{canondata_backend}/1946324/820e3db848dce72eac40665b9ca64552c869136b/resource.tar.gz#test.test_blocks-combine_hashed_max--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_max--Plan]": [
@@ -612,9 +612,9 @@
],
"test.test[blocks-combine_hashed_sum_many_keys--Peephole]": [
{
- "checksum": "dce58d7114cdf691cd277a82b722aaef",
- "size": 11971,
- "uri": "https://{canondata_backend}/1775319/049530c2ab88f8664aade59a42710a0f0aec002c/resource.tar.gz#test.test_blocks-combine_hashed_sum_many_keys--Peephole_/opt.yql"
+ "checksum": "616ca4f49125448895b333f0c77aa4bb",
+ "size": 12071,
+ "uri": "https://{canondata_backend}/1946324/820e3db848dce72eac40665b9ca64552c869136b/resource.tar.gz#test.test_blocks-combine_hashed_sum_many_keys--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_sum_many_keys--Plan]": [
@@ -640,9 +640,9 @@
],
"test.test[blocks-distinct_mixed_keys--Peephole]": [
{
- "checksum": "16b9a0bc731ecbefb8d4274854782711",
- "size": 7436,
- "uri": "https://{canondata_backend}/1775319/049530c2ab88f8664aade59a42710a0f0aec002c/resource.tar.gz#test.test_blocks-distinct_mixed_keys--Peephole_/opt.yql"
+ "checksum": "76517f43bab95ee0125ec1b4ccf3a55b",
+ "size": 7664,
+ "uri": "https://{canondata_backend}/1937001/e715663cfc04c70f73b3049843d47c204d5f6c08/resource.tar.gz#test.test_blocks-distinct_mixed_keys--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_mixed_keys--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json
index 127ec082c8..0a108275f3 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part2/canondata/result.json
@@ -554,9 +554,9 @@
],
"test.test[blocks-combine_all_minmax_double--Peephole]": [
{
- "checksum": "b2c19f7c87d23fc16be804df86d23185",
- "size": 2017,
- "uri": "https://{canondata_backend}/995452/d8c0a0cd1e7f0cabf7ce37e867037d3ed9ceb5b4/resource.tar.gz#test.test_blocks-combine_all_minmax_double--Peephole_/opt.yql"
+ "checksum": "b2b4a9eeb219a440e6ab3de3861533c0",
+ "size": 2037,
+ "uri": "https://{canondata_backend}/1936273/085dd9582c7a504381fe179e61eafc7aa1a2ac09/resource.tar.gz#test.test_blocks-combine_all_minmax_double--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_minmax_double--Plan]": [
@@ -722,9 +722,9 @@
],
"test.test[blocks-string_as_agg_key--Peephole]": [
{
- "checksum": "5923fbb79cabc45e72d3e1b1ee3a9ecf",
- "size": 4217,
- "uri": "https://{canondata_backend}/1936947/cf80e3ac9f8ab72a1a9ec43a693fa2afe6eab064/resource.tar.gz#test.test_blocks-string_as_agg_key--Peephole_/opt.yql"
+ "checksum": "e54b6a68d1a685a982fc3093a411d4c2",
+ "size": 4257,
+ "uri": "https://{canondata_backend}/1942525/f1dbd68be895d5bf52c9eba32b1cb5bb032d2c48/resource.tar.gz#test.test_blocks-string_as_agg_key--Peephole_/opt.yql"
}
],
"test.test[blocks-string_as_agg_key--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json
index 9f92b7a883..94e4b44c6f 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json
@@ -385,9 +385,9 @@
],
"test.test[blocks-combine_all_sum_filter_opt--Peephole]": [
{
- "checksum": "ff2c95963b13fa52699ffa7c693a6a58",
- "size": 2119,
- "uri": "https://{canondata_backend}/1689644/ed8a7467fa50846051bff00d767534d539df7443/resource.tar.gz#test.test_blocks-combine_all_sum_filter_opt--Peephole_/opt.yql"
+ "checksum": "12c44da956ba25f6ecdcb9cb6a59c970",
+ "size": 2162,
+ "uri": "https://{canondata_backend}/1925821/5430ca5ee93f79b17db6a6f6b6c531354175f340/resource.tar.gz#test.test_blocks-combine_all_sum_filter_opt--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_sum_filter_opt--Plan]": [
@@ -413,9 +413,9 @@
],
"test.test[blocks-distinct_pure_all--Peephole]": [
{
- "checksum": "054591522c53f5abcbc928609cdaf8f1",
- "size": 3185,
- "uri": "https://{canondata_backend}/1689644/ed8a7467fa50846051bff00d767534d539df7443/resource.tar.gz#test.test_blocks-distinct_pure_all--Peephole_/opt.yql"
+ "checksum": "6ab2263b47ec33e1dac75559b453cbc2",
+ "size": 3287,
+ "uri": "https://{canondata_backend}/1925821/5430ca5ee93f79b17db6a6f6b6c531354175f340/resource.tar.gz#test.test_blocks-distinct_pure_all--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_pure_all--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json
index 51452231a2..7b9e8dfcb5 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part4/canondata/result.json
@@ -546,9 +546,9 @@
],
"test.test[blocks-combine_all_min_filter_opt--Peephole]": [
{
- "checksum": "ff2c95963b13fa52699ffa7c693a6a58",
- "size": 2119,
- "uri": "https://{canondata_backend}/1936273/8bc4af96bbb1500304fd1be8e2559bcbead7eb36/resource.tar.gz#test.test_blocks-combine_all_min_filter_opt--Peephole_/opt.yql"
+ "checksum": "12c44da956ba25f6ecdcb9cb6a59c970",
+ "size": 2162,
+ "uri": "https://{canondata_backend}/1937001/4c624b84b0fd73da233705cdf7f661499ce8ebcf/resource.tar.gz#test.test_blocks-combine_all_min_filter_opt--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_min_filter_opt--Plan]": [
@@ -574,9 +574,9 @@
],
"test.test[blocks-decimal_avg--Peephole]": [
{
- "checksum": "cdc8a4d00988408c92640297fa47c45a",
- "size": 3137,
- "uri": "https://{canondata_backend}/1931696/1cb947d289c4d65b45c3fba919aa59c8a7d02c7a/resource.tar.gz#test.test_blocks-decimal_avg--Peephole_/opt.yql"
+ "checksum": "3d5fcce74de740687d3dc82295049e37",
+ "size": 3157,
+ "uri": "https://{canondata_backend}/1937001/4c624b84b0fd73da233705cdf7f661499ce8ebcf/resource.tar.gz#test.test_blocks-decimal_avg--Peephole_/opt.yql"
}
],
"test.test[blocks-decimal_avg--Plan]": [
@@ -602,9 +602,9 @@
],
"test.test[blocks-json_document_type--Peephole]": [
{
- "checksum": "a7cd19fa1416b435369188128797921b",
- "size": 2595,
- "uri": "https://{canondata_backend}/1917492/e3ceae93b691869b63d950444badb29e6ace4842/resource.tar.gz#test.test_blocks-json_document_type--Peephole_/opt.yql"
+ "checksum": "1536d7c0c5b539ba78f7d43b8ebb5833",
+ "size": 2599,
+ "uri": "https://{canondata_backend}/1946324/6bfca3fdf8ec9efde60e3282ae98ccf726043684/resource.tar.gz#test.test_blocks-json_document_type--Peephole_/opt.yql"
}
],
"test.test[blocks-json_document_type--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json
index 582774c7e1..19bd1f6150 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part5/canondata/result.json
@@ -512,9 +512,9 @@
],
"test.test[blocks-combine_all_some--Peephole]": [
{
- "checksum": "988d82092e311c0cc2288f6c721aa11c",
- "size": 6223,
- "uri": "https://{canondata_backend}/1936273/d36ce8069af100fc1a5be882969a5505c35574b2/resource.tar.gz#test.test_blocks-combine_all_some--Peephole_/opt.yql"
+ "checksum": "ec0346c566d620b4bc0d3de5b81e5a9a",
+ "size": 6225,
+ "uri": "https://{canondata_backend}/1937001/ef9f5ea919d8150ce90cc4b0dbee0c1cba26fb65/resource.tar.gz#test.test_blocks-combine_all_some--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_some--Plan]": [
@@ -540,9 +540,9 @@
],
"test.test[blocks-combine_hashed_avg--Peephole]": [
{
- "checksum": "6cb4c5227d709714d5993d2ed2c77b6b",
- "size": 4568,
- "uri": "https://{canondata_backend}/1777230/4c11233cb21f28d476a7607d88951afa44d7ddc0/resource.tar.gz#test.test_blocks-combine_hashed_avg--Peephole_/opt.yql"
+ "checksum": "17a606506695fd0869a72faebd42531d",
+ "size": 4614,
+ "uri": "https://{canondata_backend}/1917492/b000db17817f99c1d3a0274a3622303fc2214093/resource.tar.gz#test.test_blocks-combine_hashed_avg--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_avg--Plan]": [
@@ -568,9 +568,9 @@
],
"test.test[blocks-distinct_mixed_all--Peephole]": [
{
- "checksum": "3848f2a2650fe368e09a4991bdd3fc0d",
- "size": 4241,
- "uri": "https://{canondata_backend}/1936273/d36ce8069af100fc1a5be882969a5505c35574b2/resource.tar.gz#test.test_blocks-distinct_mixed_all--Peephole_/opt.yql"
+ "checksum": "0cee70179fa613f92ac69175e9ca3b83",
+ "size": 4317,
+ "uri": "https://{canondata_backend}/1937001/ef9f5ea919d8150ce90cc4b0dbee0c1cba26fb65/resource.tar.gz#test.test_blocks-distinct_mixed_all--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_mixed_all--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json
index 0b84ff1429..6181c3b5a8 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json
@@ -519,9 +519,9 @@
],
"test.test[blocks-combine_all_avg--Peephole]": [
{
- "checksum": "72c688d8ba2b59a621b7d928ad361432",
- "size": 6542,
- "uri": "https://{canondata_backend}/1689644/9c85c8a8fbfb9b467aefa0e6adc30066e9b66407/resource.tar.gz#test.test_blocks-combine_all_avg--Peephole_/opt.yql"
+ "checksum": "74bd46cd3b4e6e0003fd1617382d91c2",
+ "size": 6562,
+ "uri": "https://{canondata_backend}/1916746/16b0b89d25b015809e26ecf42c6ebb271b7e329d/resource.tar.gz#test.test_blocks-combine_all_avg--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_avg--Plan]": [
@@ -547,9 +547,9 @@
],
"test.test[blocks-combine_hashed_count_filter--Peephole]": [
{
- "checksum": "dbad3ef6538bee756fe6988b10c2d026",
- "size": 2691,
- "uri": "https://{canondata_backend}/1920236/9b3d7b78be8d9e7dc5f97424762bc80230b95228/resource.tar.gz#test.test_blocks-combine_hashed_count_filter--Peephole_/opt.yql"
+ "checksum": "72546bd6ba3596963df0163bdb47b7ef",
+ "size": 2718,
+ "uri": "https://{canondata_backend}/1689644/4afe37611f20ddb78db6921c77de0f60bc674896/resource.tar.gz#test.test_blocks-combine_hashed_count_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_count_filter--Plan]": [
@@ -631,9 +631,9 @@
],
"test.test[blocks-finalize_hashed_keys--Peephole]": [
{
- "checksum": "1d999c5294dc83f1a97d3da88c4bcd6d",
- "size": 3967,
- "uri": "https://{canondata_backend}/1920236/9b3d7b78be8d9e7dc5f97424762bc80230b95228/resource.tar.gz#test.test_blocks-finalize_hashed_keys--Peephole_/opt.yql"
+ "checksum": "38969a82d581c3ce44fa5c016e693467",
+ "size": 3975,
+ "uri": "https://{canondata_backend}/1916746/16b0b89d25b015809e26ecf42c6ebb271b7e329d/resource.tar.gz#test.test_blocks-finalize_hashed_keys--Peephole_/opt.yql"
}
],
"test.test[blocks-finalize_hashed_keys--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json
index d11731f9e3..d6f740086c 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part7/canondata/result.json
@@ -436,9 +436,9 @@
],
"test.test[blocks-combine_all_count_filter--Peephole]": [
{
- "checksum": "1bd1637694025d218083e5311e2e737e",
- "size": 2200,
- "uri": "https://{canondata_backend}/1903885/342ba96f93dd1681ca1277caeb9ad885799797d8/resource.tar.gz#test.test_blocks-combine_all_count_filter--Peephole_/opt.yql"
+ "checksum": "7e6527be84ea04af5daa4690e3affc77",
+ "size": 2243,
+ "uri": "https://{canondata_backend}/1936273/198988f944c8f3fd97179a6f8fffeac11f8d1537/resource.tar.gz#test.test_blocks-combine_all_count_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_count_filter--Plan]": [
@@ -464,9 +464,9 @@
],
"test.test[blocks-combine_hashed_minmax_nested--Peephole]": [
{
- "checksum": "6a31c3120cf2eb25ca9660df0025dc6d",
- "size": 4248,
- "uri": "https://{canondata_backend}/1946324/776cdf4fe38b7e55a357e57769c922aaa9c78eb8/resource.tar.gz#test.test_blocks-combine_hashed_minmax_nested--Peephole_/opt.yql"
+ "checksum": "c51e7072a46b6629eaca5986198f14b7",
+ "size": 4252,
+ "uri": "https://{canondata_backend}/1871182/1602ffd1989777b6937512e2491f3b6431e75351/resource.tar.gz#test.test_blocks-combine_hashed_minmax_nested--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_minmax_nested--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json
index 591eccf27f..6914a530af 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part8/canondata/result.json
@@ -697,9 +697,9 @@
],
"test.test[blocks-combine_all_avg_filter_opt--Peephole]": [
{
- "checksum": "457eaa347bc78b8ae704dcfab24cd2c5",
- "size": 2450,
- "uri": "https://{canondata_backend}/1689644/939d001ead810a20e7af16f81f7b19e996b5ab10/resource.tar.gz#test.test_blocks-combine_all_avg_filter_opt--Peephole_/opt.yql"
+ "checksum": "a86349d68cb76289dcccb93060bf1534",
+ "size": 2493,
+ "uri": "https://{canondata_backend}/1936273/ef8b8680bb231b437c30900120a6917ae15cccaa/resource.tar.gz#test.test_blocks-combine_all_avg_filter_opt--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_avg_filter_opt--Plan]": [
@@ -725,9 +725,9 @@
],
"test.test[blocks-combine_all_max_filter--Peephole]": [
{
- "checksum": "1a245481237e4f62ec4b0a9478b46649",
- "size": 2052,
- "uri": "https://{canondata_backend}/1689644/939d001ead810a20e7af16f81f7b19e996b5ab10/resource.tar.gz#test.test_blocks-combine_all_max_filter--Peephole_/opt.yql"
+ "checksum": "38a1cf0163b1ee0e882d3e2b4510bf6b",
+ "size": 2095,
+ "uri": "https://{canondata_backend}/1936273/ef8b8680bb231b437c30900120a6917ae15cccaa/resource.tar.gz#test.test_blocks-combine_all_max_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_max_filter--Plan]": [
@@ -753,9 +753,9 @@
],
"test.test[blocks-combine_hashed_some--Peephole]": [
{
- "checksum": "6c767ae3fdac7034ca43b157bd8894a7",
- "size": 3458,
- "uri": "https://{canondata_backend}/1775319/5727f49e842aea7c325a2340890ed9e5d39107df/resource.tar.gz#test.test_blocks-combine_hashed_some--Peephole_/opt.yql"
+ "checksum": "a6278c3a5e5565cca2826f911a303aab",
+ "size": 3506,
+ "uri": "https://{canondata_backend}/1871102/fcf3cadf4b55e03aeab83cc56f753fec4c829577/resource.tar.gz#test.test_blocks-combine_hashed_some--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_hashed_some--Plan]": [
diff --git a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
index 2405a137be..e45f299c03 100644
--- a/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
+++ b/ydb/library/yql/tests/sql/yt_native_file/part9/canondata/result.json
@@ -429,9 +429,9 @@
],
"test.test[blocks-combine_all_count--Peephole]": [
{
- "checksum": "d065efb0530ef025a6a377c7e583607d",
- "size": 3141,
- "uri": "https://{canondata_backend}/1936273/078595a3559c5c438ed530f67006a92558087996/resource.tar.gz#test.test_blocks-combine_all_count--Peephole_/opt.yql"
+ "checksum": "a05d2b4951425b78494384e3966a40a7",
+ "size": 3205,
+ "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-combine_all_count--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_count--Plan]": [
@@ -457,9 +457,9 @@
],
"test.test[blocks-combine_all_sum_filter--Peephole]": [
{
- "checksum": "a08ca9ef70405bb6a580d476ca3972f2",
- "size": 2052,
- "uri": "https://{canondata_backend}/1936273/078595a3559c5c438ed530f67006a92558087996/resource.tar.gz#test.test_blocks-combine_all_sum_filter--Peephole_/opt.yql"
+ "checksum": "bd161548e7817d6c81c75cf0318f8e42",
+ "size": 2095,
+ "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-combine_all_sum_filter--Peephole_/opt.yql"
}
],
"test.test[blocks-combine_all_sum_filter--Plan]": [
@@ -513,9 +513,9 @@
],
"test.test[blocks-distinct_pure_keys--Peephole]": [
{
- "checksum": "c59c6d34d5ee19a75a2f78eeeac2c5ff",
- "size": 6736,
- "uri": "https://{canondata_backend}/1936947/dd735d32257373e0e81a1a0c690b04eb821b17d3/resource.tar.gz#test.test_blocks-distinct_pure_keys--Peephole_/opt.yql"
+ "checksum": "5a124ce37da8b898226286c987ad5df1",
+ "size": 6946,
+ "uri": "https://{canondata_backend}/1775319/7897edc8408eb00cc6ed290a2f889d4524ac2280/resource.tar.gz#test.test_blocks-distinct_pure_keys--Peephole_/opt.yql"
}
],
"test.test[blocks-distinct_pure_keys--Plan]": [
@@ -541,9 +541,9 @@
],
"test.test[blocks-group_by_complex_key--Peephole]": [
{
- "checksum": "51c549ee925a9e1b57fb53b9d8af78bf",
- "size": 3993,
- "uri": "https://{canondata_backend}/1937367/bf3daea8b53db195e51f49431b822900029c2a5d/resource.tar.gz#test.test_blocks-group_by_complex_key--Peephole_/opt.yql"
+ "checksum": "ed8ba52767c23cc82a2e65723d2d632d",
+ "size": 3997,
+ "uri": "https://{canondata_backend}/1871102/d4092e58bfb3465cc6e1385e1e0fb9ddf349664c/resource.tar.gz#test.test_blocks-group_by_complex_key--Peephole_/opt.yql"
}
],
"test.test[blocks-group_by_complex_key--Plan]": [