aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-03-24 15:29:35 +0300
committervvvv <vvvv@ydb.tech>2023-03-24 15:29:35 +0300
commitf8d946ba7430c3864f0a97d42fa295d19ed40a3e (patch)
tree3b9cafcffd656d3d7f6211fb11a93a4e2cf3f353
parent5d3a16dbb82b337e09db43c42aa12445e1f678c4 (diff)
downloadydb-f8d946ba7430c3864f0a97d42fa295d19ed40a3e.tar.gz
Initial implementation of block SOME
impl impl
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp48
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp285
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.h10
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ya.make2
10 files changed, 332 insertions, 25 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 3d13c329eb..6051ce2cbb 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -5379,7 +5379,7 @@ namespace {
ui32 expectedArgs;
if (name == "count_all") {
expectedArgs = overState ? 2 : 1;
- } else if (name == "count" || name == "sum" || name == "avg" || name == "min" || name == "max") {
+ } else if (name == "count" || name == "sum" || name == "avg" || name == "min" || name == "max" || name == "some") {
expectedArgs = 2;
} else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
@@ -5455,6 +5455,10 @@ namespace {
}
input->SetTypeAnn(retType);
+ } else if (name == "some") {
+ auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ const TTypeAnnotationNode* retType = itemType;
+ input->SetTypeAnn(retType);
} else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
TStringBuilder() << "Unsupported agg name: " << name));
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt
index 1e05662e53..89cb473b1f 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt
@@ -41,6 +41,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
index 078a9ba89d..bce5423dca 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
@@ -42,6 +42,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt
index 078a9ba89d..bce5423dca 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt
@@ -42,6 +42,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt
index 1e05662e53..89cb473b1f 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt
@@ -41,6 +41,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
index 9785de7085..e4126c0f72 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
@@ -2,6 +2,7 @@
#include "mkql_block_agg_count.h"
#include "mkql_block_agg_sum.h"
#include "mkql_block_agg_minmax.h"
+#include "mkql_block_agg_some.h"
namespace NKikimr {
namespace NMiniKQL {
@@ -17,6 +18,7 @@ struct TAggregatorFactories {
Factories["avg"] = MakeBlockAvgFactory();
Factories["min"] = MakeBlockMinFactory();
Factories["max"] = MakeBlockMaxFactory();
+ Factories["some"] = MakeBlockSomeFactory();
}
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index 676b51fa0a..00524d0b37 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -106,13 +106,13 @@ constexpr TIn InitialStateValue() {
template <typename TIn, bool IsMin>
struct TState<true, TIn, IsMin> {
- TIn Value_ = InitialStateValue<TIn, IsMin>();
- ui8 IsValid_ = 0;
+ TIn Value = InitialStateValue<TIn, IsMin>();
+ ui8 IsValid = 0;
};
template <typename TIn, bool IsMin>
struct TState<false, TIn, IsMin> {
- TIn Value_ = InitialStateValue<TIn, IsMin>();
+ TIn Value = InitialStateValue<TIn, IsMin>();
};
using TGenericState = NUdf::TUnboxedValuePod;
@@ -131,12 +131,12 @@ public:
void Add(const void* state) final {
auto typedState = static_cast<const TStateType*>(state);
if constexpr (IsNullable) {
- if (!typedState->IsValid_) {
+ if (!typedState->IsValid) {
Builder_.Add(TBlockItem());
return;
}
}
- Builder_.Add(TBlockItem(typedState->Value_));
+ Builder_.Add(TBlockItem(typedState->Value));
}
NUdf::TUnboxedValue Build() final {
@@ -630,11 +630,11 @@ public:
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Value_ = datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = 1;
+ typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->IsValid = 1;
}
} else {
- typedState->Value_ = datum.scalar_as<TInScalar>().value;
+ typedState->Value = datum.scalar_as<TInScalar>().value;
}
} else {
const auto& array = datum.array();
@@ -647,9 +647,9 @@ public:
}
if (!filtered) {
- TIn value = typedState->Value_;
+ TIn value = typedState->Value;
if constexpr (IsNullable) {
- typedState->IsValid_ = 1;
+ typedState->IsValid = 1;
}
if (IsNullable && nullCount != 0) {
@@ -665,14 +665,14 @@ public:
}
}
- typedState->Value_ = value;
+ typedState->Value = value;
} else {
const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
const auto& filterArray = filterDatum.array();
MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
- TIn value = typedState->Value_;
+ TIn value = typedState->Value;
ui64 validCount = 0;
if (IsNullable && nullCount != 0) {
auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
@@ -691,9 +691,9 @@ public:
}
if constexpr (IsNullable) {
- typedState->IsValid_ |= validCount ? 1 : 0;
+ typedState->IsValid |= validCount ? 1 : 0;
}
- typedState->Value_ = value;
+ typedState->Value = value;
}
}
}
@@ -701,12 +701,12 @@ public:
NUdf::TUnboxedValue FinishOne(const void* state) final {
auto typedState = static_cast<const TStateType*>(state);
if constexpr (IsNullable) {
- if (!typedState->IsValid_) {
+ if (!typedState->IsValid) {
return NUdf::TUnboxedValuePod();
}
}
- return NUdf::TUnboxedValuePod(typedState->Value_);
+ return NUdf::TUnboxedValuePod(typedState->Value);
}
private:
@@ -720,28 +720,28 @@ static void PushValueToState(TState<IsNullable, TIn, IsMin>* typedState, const a
Y_ENSURE(datum.is_scalar());
if constexpr (IsNullable) {
if (datum.scalar()->is_valid) {
- typedState->Value_ = datum.scalar_as<TInScalar>().value;
- typedState->IsValid_ = 1;
+ typedState->Value = datum.scalar_as<TInScalar>().value;
+ typedState->IsValid = 1;
}
} else {
- typedState->Value_ = datum.scalar_as<TInScalar>().value;
+ typedState->Value = datum.scalar_as<TInScalar>().value;
}
} else {
const auto &array = datum.array();
auto ptr = array->GetValues<TIn>(1);
if constexpr (IsNullable) {
if (array->GetNullCount() == 0) {
- typedState->IsValid_ = 1;
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ typedState->IsValid = 1;
+ typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
} else {
auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
ui64 fullIndex = row + array->offset;
ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, SelectArg(notNull, ptr[row], typedState->Value_));
- typedState->IsValid_ |= notNull;
+ typedState->Value = UpdateMinMax<IsMin>(typedState->Value, SelectArg(notNull, ptr[row], typedState->Value));
+ typedState->IsValid |= notNull;
}
} else {
- typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+ typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
}
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
new file mode 100644
index 0000000000..8f9aec1f78
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
@@ -0,0 +1,285 @@
+#include "mkql_block_agg_some.h"
+
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+using TGenericState = NUdf::TUnboxedValuePod;
+
+void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
+ IBlockItemConverter& converter, TComputationContext& ctx)
+{
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ auto item = reader.GetScalarItem(*datum.scalar());
+ *typedState = converter.MakeValue(item, ctx.HolderFactory);
+ }
+ } else {
+ const auto& array = datum.array();
+ TBlockItem curr = reader.GetItem(*array, row);
+ if (curr) {
+ *typedState = converter.MakeValue(curr, ctx.HolderFactory);
+ }
+ }
+}
+
+class TGenericColumnBuilder : public IAggColumnBuilder {
+public:
+ TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
+ : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size))
+ , Ctx_(ctx)
+ {
+ }
+
+ void Add(const void* state) final {
+ Builder_->Add(*static_cast<const TGenericState*>(state));
+ }
+
+ NUdf::TUnboxedValue Build() final {
+ return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
+ }
+
+private:
+ const std::unique_ptr<IArrayBuilder> Builder_;
+ TComputationContext& Ctx_;
+};
+
+template<typename TTag>
+class TSomeBlockGenericAggregator;
+
+template<>
+class TSomeBlockGenericAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
+public:
+ using TBase = TCombineAllTag::TBase;
+
+ TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ {
+ }
+
+ void InitState(void* state) final {
+ new(state) TGenericState();
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ TGenericState& typedState = *static_cast<TGenericState*>(state);
+ if (typedState) {
+ return;
+ }
+
+ Y_UNUSED(batchLength);
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+
+ if (datum.is_scalar()) {
+ if (datum.scalar()->is_valid) {
+ auto item = Reader_->GetScalarItem(*datum.scalar());
+ typedState = Converter_->MakeValue(item, Ctx_.HolderFactory);
+ }
+ } else {
+ const auto& array = datum.array();
+ auto len = array->length;
+
+ const ui8* filterBitmap = nullptr;
+ if (filtered) {
+ const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
+ const auto& filterArray = filterDatum.array();
+ MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
+ filterBitmap = filterArray->template GetValues<uint8_t>(1);
+ }
+
+ for (size_t i = 0; i < len; ++i) {
+ TBlockItem curr = Reader_->GetItem(*array, i);
+ if (curr && (!filterBitmap || filterBitmap[i])) {
+ typedState = Converter_->MakeValue(curr, Ctx_.HolderFactory);
+ break;
+ }
+ }
+ }
+ }
+
+ NUdf::TUnboxedValue FinishOne(const void *state) final {
+ auto typedState = *static_cast<const TGenericState *>(state);
+ return typedState;
+ }
+
+private:
+ const ui32 ArgColumn_;
+ const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template<>
+class TSomeBlockGenericAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
+public:
+ using TBase = TCombineKeysTag::TBase;
+
+ TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , Type_(type)
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ {
+ }
+
+ void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TGenericState();
+ UpdateKey(state, columns, row);
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TGenericState*>(state);
+ if (*typedState) {
+ return;
+ }
+
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+ return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ TType* const Type_;
+ const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template<>
+class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
+public:
+ using TBase = TFinalizeKeysTag::TBase;
+
+ TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+ : TBase(sizeof(TGenericState), filterColumn, ctx)
+ , ArgColumn_(argColumn)
+ , Type_(type)
+ , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+ , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+ {
+ }
+
+ void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ new(state) TGenericState();
+ UpdateState(state, columns, row);
+ }
+
+ void DestroyState(void* state) noexcept final {
+ auto typedState = static_cast<TGenericState*>(state);
+ typedState->DeleteUnreferenced();
+ *typedState = TGenericState();
+ }
+
+ void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+ auto typedState = static_cast<TGenericState*>(state);
+ if (*typedState) {
+ return;
+ }
+
+ const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+ PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
+ }
+
+ std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+ return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+ }
+
+private:
+ const ui32 ArgColumn_;
+ TType* const Type_;
+ const std::unique_ptr<IBlockReader> Reader_;
+ const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template <typename TTag>
+class TPreparedSomeBlockGenericAggregator : public TTag::TPreparedAggregator {
+public:
+ using TBase = typename TTag::TPreparedAggregator;
+
+ TPreparedSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
+ : TBase(sizeof(TGenericState))
+ , Type_(type)
+ , FilterColumn_(filterColumn)
+ , ArgColumn_(argColumn)
+ {}
+
+ std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+ return std::make_unique<TSomeBlockGenericAggregator<TTag>>(Type_, FilterColumn_, ArgColumn_, ctx);
+ }
+
+private:
+ TType* const Type_;
+ const std::optional<ui32> FilterColumn_;
+ const ui32 ArgColumn_;
+};
+
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSome(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
+ auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
+ const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
+ auto argType = blockType->GetItemType();
+
+ return std::make_unique<TPreparedSomeBlockGenericAggregator<TTag>>(argType, filterColumn, argColumn);
+}
+
+class TBlockSomeFactory : public IBlockAggregatorFactory {
+ std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const override {
+ Y_UNUSED(env);
+ return PrepareSome<TCombineAllTag>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
+ TTupleType* tupleType,
+ std::optional<ui32> filterColumn,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const override {
+ Y_UNUSED(env);
+ return PrepareSome<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0]);
+ }
+
+ std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
+ TTupleType* tupleType,
+ const std::vector<ui32>& argsColumns,
+ const TTypeEnvironment& env) const override {
+ Y_UNUSED(env);
+ return PrepareSome<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]);
+ }
+};
+
+}
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSomeFactory() {
+ return std::make_unique<TBlockSomeFactory>();
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.h
new file mode 100644
index 0000000000..afa504cca5
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.h
@@ -0,0 +1,10 @@
+#pragma once
+#include "mkql_block_agg_factory.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSomeFactory();
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make b/ydb/library/yql/minikql/comp_nodes/ya.make
index 34648333d5..e3ac58d9a3 100644
--- a/ydb/library/yql/minikql/comp_nodes/ya.make
+++ b/ydb/library/yql/minikql/comp_nodes/ya.make
@@ -17,6 +17,8 @@ SRCS(
mkql_block_agg_factory.h
mkql_block_agg_minmax.cpp
mkql_block_agg_minmax.h
+ mkql_block_agg_some.cpp
+ mkql_block_agg_some.h
mkql_block_agg_sum.cpp
mkql_block_agg_sum.h
mkql_block_builder.cpp