aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-06-01 08:14:50 +0300
committernsofya <nsofya@yandex-team.com>2023-06-01 08:14:50 +0300
commit19137e1f7f120475065b3d180f55fd28694b0f9c (patch)
tree2738c1c4022dd524d096ff109b0adec58fd8e257
parent2e82f547491f44bfa843325f9ce7c5e23b335878 (diff)
downloadydb-19137e1f7f120475065b3d180f55fd28694b0f9c.tar.gz
Use kernels registry
Use kernels registry Prepare program functions
-rw-r--r--ydb/core/formats/arrow/program.cpp248
-rw-r--r--ydb/core/formats/arrow/program.h92
-rw-r--r--ydb/core/protos/ssa.proto10
-rw-r--r--ydb/core/tx/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h3
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.h2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/ut_program.cpp145
-rw-r--r--ydb/core/tx/program/CMakeLists.darwin-x86_64.txt27
-rw-r--r--ydb/core/tx/program/CMakeLists.linux-aarch64.txt28
-rw-r--r--ydb/core/tx/program/CMakeLists.linux-x86_64.txt28
-rw-r--r--ydb/core/tx/program/CMakeLists.txt17
-rw-r--r--ydb/core/tx/program/CMakeLists.windows-x86_64.txt27
-rw-r--r--ydb/core/tx/program/program.cpp (renamed from ydb/core/tx/columnshard/engines/reader/program.cpp)151
-rw-r--r--ydb/core/tx/program/program.h (renamed from ydb/core/tx/columnshard/engines/reader/program.h)4
-rw-r--r--ydb/core/tx/program/registry.cpp25
-rw-r--r--ydb/core/tx/program/registry.h26
33 files changed, 706 insertions, 213 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index 6c1cfe875d4..528766a30a3 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -44,6 +44,109 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
namespace NKikimr::NSsa {
+template <class TAssignObject>
+class TInternalFunction : public IStepFunction<TAssignObject> {
+ using TBase = IStepFunction<TAssignObject>;
+public:
+ using TBase::TBase;
+ arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override {
+ auto arguments = TBase::BuildArgs(batch, assign.GetArguments());
+ if (!arguments) {
+ return arrow::Status::Invalid("Error parsing args.");
+ }
+ auto funcNames = GetRegistryFunctionNames(assign.GetOperation());
+
+ arrow::Result<arrow::Datum> result = arrow::Status::UnknownError<std::string>("unknown function");
+ for (const auto& funcName : funcNames) {
+ if (TBase::Ctx && TBase::Ctx->func_registry()->GetFunction(funcName).ok()) {
+ result = arrow::compute::CallFunction(funcName, *arguments, assign.GetOptions(), TBase::Ctx);
+ } else {
+ result = arrow::compute::CallFunction(funcName, *arguments, assign.GetOptions());
+ }
+ if (result.ok()) {
+ return PrepareResult(std::move(*result), assign);
+ }
+ }
+ return result;
+ }
+private:
+ virtual std::vector<std::string> GetRegistryFunctionNames(const typename TAssignObject::TOperationType& opId) const = 0;
+ virtual arrow::Result<arrow::Datum> PrepareResult(arrow::Datum&& datum, const TAssignObject& assign) const {
+ Y_UNUSED(assign);
+ return std::move(datum);
+ }
+};
+
+class TConstFunction : public IStepFunction<TAssign> {
+ using TBase = IStepFunction<TAssign>;
+public:
+ using TBase::TBase;
+ arrow::Result<arrow::Datum> Call(const TAssign& assign, const TDatumBatch& batch) const override {
+ Y_UNUSED(batch);
+ return assign.GetConstant();
+ }
+};
+
+class TAggregateFunction : public TInternalFunction<TAggregateAssign> {
+ using TBase = TInternalFunction<TAggregateAssign>;
+private:
+ using TBase::TBase;
+ std::vector<std::string> GetRegistryFunctionNames(const EAggregate& opId) const override {
+ return { GetFunctionName(opId), GetHouseFunctionName(opId)};
+ }
+ arrow::Result<arrow::Datum> PrepareResult(arrow::Datum&& datum, const TAggregateAssign& assign) const override {
+ if (!datum.is_scalar()) {
+ return arrow::Status::Invalid("Aggregate result is not a scalar.");
+ }
+
+ if (datum.scalar()->type->id() == arrow::Type::STRUCT) {
+ auto op = assign.GetOperation();
+ if (op == EAggregate::Min) {
+ const auto& minMax = datum.scalar_as<arrow::StructScalar>();
+ return minMax.value[0];
+ } else if (op == EAggregate::Max) {
+ const auto& minMax = datum.scalar_as<arrow::StructScalar>();
+ return minMax.value[1];
+ } else {
+ return arrow::Status::Invalid("Unexpected struct result for aggregate function.");
+ }
+ }
+ if (!datum.type()) {
+ return arrow::Status::Invalid("Aggregate result has no type.");
+ }
+ return std::move(datum);
+ }
+};
+
+class TSimpleFunction : public TInternalFunction<TAssign> {
+ using TBase = TInternalFunction<TAssign>;
+private:
+ using TBase::TBase;
+ virtual std::vector<std::string> GetRegistryFunctionNames(const EOperation& opId) const override {
+ return { GetFunctionName(opId) };
+ }
+};
+
+template <class TAssignObject>
+class TKernelFunction : public IStepFunction<TAssignObject> {
+ using TBase = IStepFunction<TAssignObject>;
+ const TFunctionPtr Function;
+
+public:
+ TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx)
+ : TBase(ctx)
+ , Function(kernelsFunction)
+ {}
+
+ arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override {
+ auto arguments = TBase::BuildArgs(batch, assign.GetArguments());
+ if (!arguments) {
+ return arrow::Status::Invalid("Error parsing args.");
+ }
+ return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
+ }
+};
+
const char * GetFunctionName(EOperation op) {
switch (op) {
case EOperation::CastBoolean:
@@ -338,64 +441,6 @@ CH::AggFunctionId GetHouseFunction(EAggregate op) {
return CH::AggFunctionId::AGG_UNSPECIFIED;
}
-
-template <bool houseFunction, typename TOpId, typename TOptions>
-arrow::Result<arrow::Datum> CallFunctionById(
- TOpId funcId, const std::vector<std::string>& args,
- const TOptions* funcOpts,
- const TProgramStep::TDatumBatch& batch,
- arrow::compute::ExecContext* ctx)
-{
- std::vector<arrow::Datum> arguments;
- arguments.reserve(args.size());
-
- for (auto& colName : args) {
- auto column = batch.GetColumnByName(colName);
- if (!column.ok()) {
- return column.status();
- }
- arguments.push_back(*column);
- }
- std::string funcName;
- if constexpr (houseFunction) {
- funcName = GetHouseFunctionName(funcId);
- } else {
- funcName = GetFunctionName(funcId);
- }
-
- if (ctx && ctx->func_registry()->GetFunction(funcName).ok()) {
- return arrow::compute::CallFunction(funcName, arguments, funcOpts, ctx);
- }
- return arrow::compute::CallFunction(funcName, arguments, funcOpts);
-}
-
-arrow::Result<arrow::Datum> CallFunctionByAssign(
- const TAssign& assign,
- const TProgramStep::TDatumBatch& batch,
- arrow::compute::ExecContext* ctx)
-{
- return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(),
- batch, ctx);
-}
-
-arrow::Result<arrow::Datum> CallFunctionByAssign(
- const TAggregateAssign& assign,
- const TProgramStep::TDatumBatch& batch,
- arrow::compute::ExecContext* ctx)
-{
- return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
- batch, ctx);
-}
-
-arrow::Result<arrow::Datum> CallHouseFunctionByAssign(
- const TAggregateAssign& assign,
- TProgramStep::TDatumBatch& batch,
- arrow::compute::ExecContext* ctx)
-{
- return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
- batch, ctx);
-}
-
CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) {
CH::GroupByOptions::Assign descr;
descr.function = GetHouseFunction(assign.GetOperation());
@@ -411,10 +456,7 @@ CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) {
}
-arrow::Status TProgramStep::TDatumBatch::AddColumn(
- const std::string& name,
- arrow::Datum&& column)
-{
+arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& column) {
if (Schema->GetFieldIndex(name) != -1) {
return arrow::Status::Invalid("Trying to add duplicate column '" + name + "'");
}
@@ -432,7 +474,7 @@ arrow::Status TProgramStep::TDatumBatch::AddColumn(
return arrow::Status::OK();
}
-arrow::Result<arrow::Datum> TProgramStep::TDatumBatch::GetColumnByName(const std::string& name) const {
+arrow::Result<arrow::Datum> TDatumBatch::GetColumnByName(const std::string& name) const {
auto i = Schema->GetFieldIndex(name);
if (i < 0) {
return arrow::Status::Invalid("Not found column '" + name + "' or duplicate");
@@ -440,7 +482,7 @@ arrow::Result<arrow::Datum> TProgramStep::TDatumBatch::GetColumnByName(const std
return Datums[i];
}
-std::shared_ptr<arrow::RecordBatch> TProgramStep::TDatumBatch::ToRecordBatch() const {
+std::shared_ptr<arrow::RecordBatch> TDatumBatch::ToRecordBatch() const {
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(Datums.size());
for (auto col : Datums) {
@@ -457,8 +499,7 @@ std::shared_ptr<arrow::RecordBatch> TProgramStep::TDatumBatch::ToRecordBatch() c
return arrow::RecordBatch::Make(Schema, Rows, columns);
}
-std::shared_ptr<TProgramStep::TDatumBatch>
-TProgramStep::TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) {
+std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) {
std::vector<arrow::Datum> datums;
datums.reserve(batch->num_columns());
for (int64_t i = 0; i < batch->num_columns(); ++i) {
@@ -472,11 +513,26 @@ TProgramStep::TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>&
});
}
+IStepFunction<TAssign>::TPtr TAssign::GetFunction(arrow::compute::ExecContext* ctx) const {
+ if (KernelFunction) {
+ return std::make_shared<TKernelFunction<TAssign>>(KernelFunction, ctx);
+ }
+ if (IsConstant()) {
+ return std::make_shared<TConstFunction>(ctx);
+ }
+ return std::make_shared<TSimpleFunction>(ctx);
+}
-arrow::Status TProgramStep::ApplyAssignes(
- TProgramStep::TDatumBatch& batch,
- arrow::compute::ExecContext* ctx) const
-{
+
+IStepFunction<TAggregateAssign>::TPtr TAggregateAssign::GetFunction(arrow::compute::ExecContext* ctx) const {
+ if (KernelFunction) {
+ return std::make_shared<TKernelFunction<TAggregateAssign>>(KernelFunction, ctx);
+ }
+ return std::make_shared<TAggregateFunction>(ctx);
+}
+
+
+arrow::Status TProgramStep::ApplyAssignes(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const {
if (Assignes.empty()) {
return arrow::Status::OK();
}
@@ -486,29 +542,20 @@ arrow::Status TProgramStep::ApplyAssignes(
return arrow::Status::Invalid("Assign to existing column '" + assign.GetName() + "'.");
}
- arrow::Datum column;
- if (assign.IsConstant()) {
- column = assign.GetConstant();
- } else {
- auto funcResult = CallFunctionByAssign(assign, batch, ctx);
- if (!funcResult.ok()) {
- return funcResult.status();
- }
- column = *funcResult;
+ auto funcResult = assign.GetFunction(ctx)->Call(assign, batch);
+ if (!funcResult.ok()) {
+ return funcResult.status();
}
+ arrow::Datum column = *funcResult;
auto status = batch.AddColumn(assign.GetName(), std::move(column));
if (!status.ok()) {
return status;
}
}
- //return batch->Validate();
return arrow::Status::OK();
}
-arrow::Status TProgramStep::ApplyAggregates(
- TDatumBatch& batch,
- arrow::compute::ExecContext* ctx) const
-{
+arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const {
if (GroupBy.empty()) {
return arrow::Status::OK();
}
@@ -522,40 +569,13 @@ arrow::Status TProgramStep::ApplyAggregates(
if (GroupByKeys.empty()) {
for (auto& assign : GroupBy) {
- auto funcResult = CallFunctionByAssign(assign, batch, ctx);
+ auto funcResult = assign.GetFunction(ctx)->Call(assign, batch);
if (!funcResult.ok()) {
- auto houseResult = CallHouseFunctionByAssign(assign, batch, ctx);
- if (!houseResult.ok()) {
- return funcResult.status();
- }
- funcResult = houseResult;
+ return funcResult.status();
}
-
res.Datums.push_back(*funcResult);
- auto& column = res.Datums.back();
- if (!column.is_scalar()) {
- return arrow::Status::Invalid("Aggregate result is not a scalar.");
- }
-
- if (column.scalar()->type->id() == arrow::Type::STRUCT) {
- auto op = assign.GetOperation();
- if (op == EAggregate::Min) {
- const auto& minMax = column.scalar_as<arrow::StructScalar>();
- column = minMax.value[0];
- } else if (op == EAggregate::Max) {
- const auto& minMax = column.scalar_as<arrow::StructScalar>();
- column = minMax.value[1];
- } else {
- return arrow::Status::Invalid("Unexpected struct result for aggregate function.");
- }
- }
-
- if (!column.type()) {
- return arrow::Status::Invalid("Aggregate result has no type.");
- }
- fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), column.type()));
+ fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), res.Datums.back().type()));
}
-
res.Rows = 1;
} else {
CH::GroupByOptions funcOpts;
diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h
index f01e35930dc..ed5c46d2cc2 100644
--- a/ydb/core/formats/arrow/program.h
+++ b/ydb/core/formats/arrow/program.h
@@ -29,6 +29,7 @@ namespace NKikimr::NSsa {
using EOperation = NArrow::EOperation;
using EAggregate = NArrow::EAggregate;
+using TFunctionPtr = std::shared_ptr<arrow::compute::ScalarFunction>;
const char * GetFunctionName(EOperation op);
const char * GetFunctionName(EAggregate op);
@@ -36,8 +37,52 @@ const char * GetHouseFunctionName(EAggregate op);
inline const char * GetHouseGroupByName() { return "ch.group_by"; }
EOperation ValidateOperation(EOperation op, ui32 argsSize);
+struct TDatumBatch {
+ std::shared_ptr<arrow::Schema> Schema;
+ std::vector<arrow::Datum> Datums;
+ int64_t Rows{};
+
+ arrow::Status AddColumn(const std::string& name, arrow::Datum&& column);
+ arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const;
+ std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const;
+ static std::shared_ptr<TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch);
+};
+
+template <class TAssignObject>
+class IStepFunction {
+ using TSelf = IStepFunction<TAssignObject>;
+protected:
+ arrow::compute::ExecContext* Ctx;
+public:
+ using TPtr = std::shared_ptr<TSelf>;
+
+ IStepFunction(arrow::compute::ExecContext* ctx)
+ : Ctx(ctx)
+ {}
+
+ virtual ~IStepFunction() {}
+
+ virtual arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const = 0;
+
+protected:
+ std::optional<std::vector<arrow::Datum>> BuildArgs(const TDatumBatch& batch, const std::vector<std::string>& args) const {
+ std::vector<arrow::Datum> arguments;
+ arguments.reserve(args.size());
+ for (auto& colName : args) {
+ auto column = batch.GetColumnByName(colName);
+ if (!column.ok()) {
+ return {};
+ }
+ arguments.push_back(*column);
+ }
+ return std::move(arguments);
+ }
+};
+
class TAssign {
public:
+ using TOperationType = EOperation;
+
TAssign(const std::string& name, EOperation op, std::vector<std::string>&& args)
: Name(name)
, Operation(ValidateOperation(op, args.size()))
@@ -115,24 +160,38 @@ public:
, FuncOpts(nullptr)
{}
+ TAssign(const std::string& name,
+ TFunctionPtr kernelFunction,
+ std::vector<std::string>&& args,
+ std::shared_ptr<arrow::compute::FunctionOptions> funcOpts)
+ : Name(name)
+ , Arguments(std::move(args))
+ , FuncOpts(funcOpts)
+ , KernelFunction(kernelFunction)
+ {}
+
bool IsConstant() const { return Operation == EOperation::Constant; }
- bool IsOk() const { return Operation != EOperation::Unspecified; }
+ bool IsOk() const { return Operation != EOperation::Unspecified || !!KernelFunction; }
EOperation GetOperation() const { return Operation; }
const std::vector<std::string>& GetArguments() const { return Arguments; }
std::shared_ptr<arrow::Scalar> GetConstant() const { return Constant; }
const std::string& GetName() const { return Name; }
- const arrow::compute::FunctionOptions* GetFunctionOptions() const { return FuncOpts.get(); }
+ const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); }
+ IStepFunction<TAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const;
private:
std::string Name;
EOperation Operation{EOperation::Unspecified};
std::vector<std::string> Arguments;
std::shared_ptr<arrow::Scalar> Constant;
std::shared_ptr<arrow::compute::FunctionOptions> FuncOpts;
+ TFunctionPtr KernelFunction;
};
class TAggregateAssign {
public:
+ using TOperationType = EAggregate;
+
TAggregateAssign(const std::string& name, EAggregate op = EAggregate::Unspecified)
: Name(name)
, Operation(op)
@@ -152,20 +211,32 @@ public:
}
}
- bool IsOk() const { return Operation != EAggregate::Unspecified; }
+ TAggregateAssign(const std::string& name,
+ TFunctionPtr kernelFunction,
+ std::vector<std::string>&& args)
+ : Name(name)
+ , Arguments(std::move(args))
+ , KernelFunction(kernelFunction)
+ {}
+
+ bool IsOk() const { return Operation != EAggregate::Unspecified || !!KernelFunction; }
EAggregate GetOperation() const { return Operation; }
const std::vector<std::string>& GetArguments() const { return Arguments; }
std::vector<std::string>& MutableArguments() { return Arguments; }
const std::string& GetName() const { return Name; }
- const arrow::compute::ScalarAggregateOptions& GetAggregateOptions() const { return ScalarOpts; }
+ const arrow::compute::ScalarAggregateOptions* GetOptions() const { return &ScalarOpts; }
+
+ IStepFunction<TAggregateAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const;
private:
std::string Name;
EAggregate Operation{EAggregate::Unspecified};
std::vector<std::string> Arguments;
arrow::compute::ScalarAggregateOptions ScalarOpts; // TODO: make correct options
+ TFunctionPtr KernelFunction;
};
+
/// Group of commands that finishes with projection. Steps add locality for columns definition.
///
/// In step we have non-decreasing count of columns (line to line) till projection. So columns are either source
@@ -182,16 +253,7 @@ struct TProgramStep {
std::vector<std::string> GroupByKeys; // TODO: it's possible to use them without GROUP BY for DISTINCT
std::vector<std::string> Projection; // Step's result columns (remove others)
- struct TDatumBatch {
- std::shared_ptr<arrow::Schema> Schema;
- std::vector<arrow::Datum> Datums;
- int64_t Rows{};
-
- arrow::Status AddColumn(const std::string& name, arrow::Datum&& column);
- arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const;
- std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const;
- static std::shared_ptr<TProgramStep::TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch);
- };
+ using TDatumBatch = TDatumBatch;
std::set<std::string> GetColumnsInUsage() const;
@@ -207,7 +269,7 @@ struct TProgramStep {
arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const;
arrow::Status ApplyProjection(TDatumBatch& batch) const;
- arrow::Status MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const;
+ arrow::Status MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const;
};
struct TProgram {
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
index eaa8891c41d..61d45469830 100644
--- a/ydb/core/protos/ssa.proto
+++ b/ydb/core/protos/ssa.proto
@@ -40,6 +40,11 @@ message TProgram {
optional string Name = 1;
}
+ enum EFunctionType {
+ SIMPLE_ARROW = 1;
+ YQL_KERNEL = 2;
+ }
+
message TAssignment {
enum EFunction {
FUNC_UNSPECIFIED = 0;
@@ -85,6 +90,8 @@ message TProgram {
message TFunction {
optional uint32 Id = 1; // EFunction
repeated TColumn Arguments = 2;
+ optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ];
+ optional uint32 KernelIdx = 4;
}
message TExternalFunction {
@@ -127,6 +134,8 @@ message TProgram {
optional uint32 Id = 1; // EAggregateFunction
repeated TColumn Arguments = 2;
optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV
+ optional EFunctionType FunctionType = 4 [ default = SIMPLE_ARROW ];
+ optional uint32 KernelIdx = 5;
// TODO: Parameters, i.e. N for topK(N)(arg)
}
@@ -171,4 +180,5 @@ message TOlapProgram {
// RecordBatch deserialization require arrow::Schema, thus store it here
optional bytes ParametersSchema = 2;
optional bytes Parameters = 3;
+ optional bytes Kernels = 4;
}
diff --git a/ydb/core/tx/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/CMakeLists.darwin-x86_64.txt
index 3589716aac5..03fdff7248e 100644
--- a/ydb/core/tx/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.darwin-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
add_subdirectory(mediator)
+add_subdirectory(program)
add_subdirectory(replication)
add_subdirectory(scheme_board)
add_subdirectory(scheme_cache)
diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt
index d0bf8cc5773..3ed83b3f8ed 100644
--- a/ydb/core/tx/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
add_subdirectory(mediator)
+add_subdirectory(program)
add_subdirectory(replication)
add_subdirectory(scheme_board)
add_subdirectory(scheme_cache)
diff --git a/ydb/core/tx/CMakeLists.linux-x86_64.txt b/ydb/core/tx/CMakeLists.linux-x86_64.txt
index d0bf8cc5773..3ed83b3f8ed 100644
--- a/ydb/core/tx/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
add_subdirectory(mediator)
+add_subdirectory(program)
add_subdirectory(replication)
add_subdirectory(scheme_board)
add_subdirectory(scheme_cache)
diff --git a/ydb/core/tx/CMakeLists.windows-x86_64.txt b/ydb/core/tx/CMakeLists.windows-x86_64.txt
index 3589716aac5..03fdff7248e 100644
--- a/ydb/core/tx/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.windows-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
add_subdirectory(mediator)
+add_subdirectory(program)
add_subdirectory(replication)
add_subdirectory(scheme_board)
add_subdirectory(scheme_cache)
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 9965189fe32..a80fba2971d 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -341,3 +341,26 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec
}
}
+
+namespace NKikimr::NColumnShard {
+ NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema,
+ const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key) {
+ NOlap::TIndexInfo indexInfo = NOlap::TIndexInfo::BuildDefault();
+
+ for (ui32 i = 0; i < ydbSchema.size(); ++i) {
+ ui32 id = i + 1;
+ auto& name = ydbSchema[i].first;
+ auto& type = ydbSchema[i].second;
+
+ indexInfo.Columns[id] = NTable::TColumn(name, id, type, "");
+ indexInfo.ColumnNames[name] = id;
+ }
+
+ for (const auto& [keyName, keyType] : key) {
+ indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
+ }
+
+ indexInfo.SetAllKeys();
+ return indexInfo;
+ }
+}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index d257f9e2c40..49e114ff2c1 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -472,4 +472,7 @@ namespace NKikimr::NColumnShard {
return arrow::RecordBatch::Make(Schema, RowsCount, columns);
}
};
+
+ NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema,
+ const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key);
}
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 8a29dbda949..b868fce25e7 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -35,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
formats-arrow-compression
+ core-tx-program
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 43a5fa4870c..d73745d1174 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -36,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
formats-arrow-compression
+ core-tx-program
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 43a5fa4870c..d73745d1174 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
formats-arrow-compression
+ core-tx-program
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 8a29dbda949..b868fce25e7 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -35,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-predicate
columnshard-engines-storage
formats-arrow-compression
+ core-tx-program
udf-service-exception_policy
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 099c560c72e..19f1228ad62 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ core-tx-program
engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
@@ -40,7 +41,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 4c3c3a73226..78e360e108d 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ core-tx-program
engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
@@ -41,7 +42,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 4c3c3a73226..78e360e108d 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ core-tx-program
engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
@@ -41,7 +42,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 099c560c72e..19f1228ad62 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
ydb-core-protos
core-formats-arrow
columnshard-engines-predicate
+ core-tx-program
engines-reader-order_control
tools-enum_parser-enum_serialization_runtime
)
@@ -40,7 +41,6 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h
index b581055388f..75b6aa4f32f 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/description.h
@@ -1,5 +1,5 @@
#pragma once
-#include "program.h"
+#include <ydb/core/tx/program/program.h>
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
index 33224314acf..d59807c727a 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
ydb-core-tablet_flat
tx-columnshard-counters
yql-sql-pg_dummy
+ core-arrow_kernels-request
+ core-testlib-default
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
@@ -37,6 +39,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
index 571d214b767..50f51e0ad27 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
ydb-core-tablet_flat
tx-columnshard-counters
yql-sql-pg_dummy
+ core-arrow_kernels-request
+ core-testlib-default
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
-ldl
@@ -40,6 +42,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
index 54cd8327409..47a14cdffdd 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
ydb-core-tablet_flat
tx-columnshard-counters
yql-sql-pg_dummy
+ core-arrow_kernels-request
+ core-testlib-default
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
-ldl
@@ -41,6 +43,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
index 091ad519d80..c2cce5aea17 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
@@ -26,10 +26,14 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
ydb-core-tablet_flat
tx-columnshard-counters
yql-sql-pg_dummy
+ core-arrow_kernels-request
+ core-testlib-default
)
target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 71109ad4d4c..a3fdbdd9332 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -3,6 +3,8 @@
#include "predicate/predicate.h"
#include "index_logic_logs.h"
+#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+
namespace NKikimr {
@@ -170,27 +172,6 @@ static const std::vector<std::pair<TString, TTypeInfo>> testKey = {
{"uid", TTypeInfo(NTypeIds::Utf8) }
};
-TIndexInfo TestTableInfo(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns,
- const std::vector<std::pair<TString, TTypeInfo>>& key = testKey) {
- TIndexInfo indexInfo = TIndexInfo::BuildDefault();
-
- for (ui32 i = 0; i < ydbSchema.size(); ++i) {
- ui32 id = i + 1;
- auto& name = ydbSchema[i].first;
- auto& type = ydbSchema[i].second;
-
- indexInfo.Columns[id] = NTable::TColumn(name, id, type, "");
- indexInfo.ColumnNames[name] = id;
- }
-
- for (const auto& [keyName, keyType] : key) {
- indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
- }
-
- indexInfo.SetAllKeys();
- return indexInfo;
-}
-
template <typename TKeyDataType>
class TBuilder {
public:
@@ -357,7 +338,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
void WriteLoadRead(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema,
const std::vector<std::pair<TString, TTypeInfo>>& key) {
TTestDbWrapper db;
- TIndexInfo tableInfo = TestTableInfo(ydbSchema, key);
+ TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key);
std::vector<ui64> paths = {1, 2};
@@ -453,7 +434,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
void ReadWithPredicates(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema,
const std::vector<std::pair<TString, TTypeInfo>>& key) {
TTestDbWrapper db;
- TIndexInfo tableInfo = TestTableInfo(ydbSchema, key);
+ TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key);
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(0, TestLimits());
@@ -552,7 +533,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexWriteOverload) {
TTestDbWrapper db;
- TIndexInfo tableInfo = TestTableInfo();
+ TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);;
ui64 pathId = 1;
ui32 step = 1000;
@@ -634,7 +615,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexTtl) {
TTestDbWrapper db;
- TIndexInfo tableInfo = TestTableInfo();
+ TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);;
ui64 pathId = 1;
ui32 step = 1000;
diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp
new file mode 100644
index 00000000000..a347c2b1782
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/ut_program.cpp
@@ -0,0 +1,145 @@
+#include "index_info.h"
+#include "index_logic_logs.h"
+
+#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
+#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+#include <ydb/core/tx/program/program.h>
+
+#include <ydb/library/yql/core/arrow_kernels/request/request.h>
+#include <ydb/library/yql/core/arrow_kernels/registry/registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NKikimr::NOlap;
+using namespace NKikimr::NColumnShard;
+using namespace NKikimr;
+namespace NTypeIds = NScheme::NTypeIds;
+using TTypeId = NScheme::TTypeId;
+using TTypeInfo = NScheme::TTypeInfo;
+
+namespace {
+ static const std::vector<std::pair<TString, TTypeInfo>> testColumns = {
+ {"timestamp", TTypeInfo(NTypeIds::Timestamp) },
+ {"uid", TTypeInfo(NTypeIds::Utf8) },
+ {"sum", TTypeInfo(NTypeIds::Int32) },
+ {"vat", TTypeInfo(NTypeIds::Int32) },
+ };
+
+ static const std::vector<std::pair<TString, TTypeInfo>> testKey = {
+ {"timestamp", TTypeInfo(NTypeIds::Timestamp) },
+ {"uid", TTypeInfo(NTypeIds::Utf8) }
+ };
+}
+
+Y_UNIT_TEST_SUITE(TestProgram) {
+
+ TString BuildRegistry() {
+ auto functionRegistry = CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry());
+ auto nodeFactory = NMiniKQL::GetBuiltinFactory();
+ NYql::TKernelRequestBuilder b(*functionRegistry);
+
+ NYql::TExprContext ctx;
+ auto blockInt32Type = ctx.template MakeType<NYql::TBlockExprType>(ctx.template MakeType<NYql::TDataExprType>(NYql::EDataSlot::Int32));
+ auto index1 = b.AddBinaryOp(NYql::TKernelRequestBuilder::EBinaryOp::Add, blockInt32Type, blockInt32Type, blockInt32Type);
+ Y_UNUSED(index1);
+ return b.Serialize();
+ }
+
+ TString SerializeProgram(const NKikimrSSA::TProgram& programProto) {
+ NKikimrSSA::TOlapProgram olapProgramProto;
+ olapProgramProto.SetKernels(BuildRegistry());
+
+ {
+ TString str;
+ Y_PROTOBUF_SUPPRESS_NODISCARD programProto.SerializeToString(&str);
+ olapProgramProto.SetProgram(str);
+ }
+ TString programSerialized;
+ Y_PROTOBUF_SUPPRESS_NODISCARD olapProgramProto.SerializeToString(&programSerialized);
+ return programSerialized;
+ }
+
+ Y_UNIT_TEST(YqlKernel) {
+ TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);
+ TIndexColumnResolver columnResolver(indexInfo);
+
+ NKikimrSSA::TProgram programProto;
+ {
+ auto* command = programProto.AddCommand();
+ auto* functionProto = command->MutableAssign()->MutableFunction();
+ functionProto->SetFunctionType(NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL);
+ functionProto->SetKernelIdx(0);
+ functionProto->AddArguments()->SetName("sum");
+ functionProto->AddArguments()->SetName("vat");
+ functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH);
+ }
+ {
+ auto* command = programProto.AddCommand();
+ auto* prjectionProto = command->MutableProjection();
+ auto* column = prjectionProto->AddColumns();
+ column->SetName("0");
+ }
+ const auto programSerialized = SerializeProgram(programProto);
+
+ TProgramContainer program;
+ TString errors;
+ UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors);
+
+ TTableUpdatesBuilder updates(NArrow::MakeArrowSchema({{"sum", TTypeInfo(NTypeIds::Int32) }, {"vat", TTypeInfo(NTypeIds::Int32) }}));
+ updates.AddRow().Add(1).Add(1);
+ updates.AddRow().Add(100).Add(0);
+
+ auto batch = updates.BuildArrow();
+ UNIT_ASSERT(program.ApplyProgram(batch).ok());
+
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Int32)) }));
+ result.AddRow().Add(2);
+ result.AddRow().Add(100);
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ }
+
+ Y_UNIT_TEST(SimpleFunction) {
+ TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);;
+ TIndexColumnResolver columnResolver(indexInfo);
+
+ NKikimrSSA::TProgram programProto;
+ {
+ auto* command = programProto.AddCommand();
+ auto* functionProto = command->MutableAssign()->MutableFunction();
+ auto* funcArg = functionProto->AddArguments();
+ funcArg->SetName("uid");
+ functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH);
+ }
+ {
+ auto* command = programProto.AddCommand();
+ auto* prjectionProto = command->MutableProjection();
+ auto* column = prjectionProto->AddColumns();
+ column->SetName("0");
+ }
+ const auto programSerialized = SerializeProgram(programProto);
+
+ TProgramContainer program;
+ TString errors;
+ UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors);
+
+ TTableUpdatesBuilder updates(NArrow::MakeArrowSchema( { std::make_pair("uid", TTypeInfo(NTypeIds::Utf8)) }));
+ updates.AddRow().Add("aaa");
+ updates.AddRow().Add("b");
+ updates.AddRow().Add("");
+
+ auto batch = updates.BuildArrow();
+ UNIT_ASSERT(program.ApplyProgram(batch).ok());
+
+ TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Uint64)) }));
+ result.AddRow().Add<uint64_t>(3);
+ result.AddRow().Add<uint64_t>(1);
+ result.AddRow().Add<uint64_t>(0);
+
+ auto expected = result.BuildArrow();
+ UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString());
+ }
+}
diff --git a/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..12bf05544f2
--- /dev/null
+++ b/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-tx-program)
+target_compile_options(core-tx-program PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-program PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ core-formats-arrow
+ ydb-core-protos
+ ydb-core-tablet_flat
+ yql-minikql-invoke_builtins
+ yql-minikql-comp_nodes
+ core-arrow_kernels-registry
+)
+target_sources(core-tx-program PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp
+)
diff --git a/ydb/core/tx/program/CMakeLists.linux-aarch64.txt b/ydb/core/tx/program/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..ce23a48278c
--- /dev/null
+++ b/ydb/core/tx/program/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-tx-program)
+target_compile_options(core-tx-program PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-program PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ core-formats-arrow
+ ydb-core-protos
+ ydb-core-tablet_flat
+ yql-minikql-invoke_builtins
+ yql-minikql-comp_nodes
+ core-arrow_kernels-registry
+)
+target_sources(core-tx-program PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp
+)
diff --git a/ydb/core/tx/program/CMakeLists.linux-x86_64.txt b/ydb/core/tx/program/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..ce23a48278c
--- /dev/null
+++ b/ydb/core/tx/program/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-tx-program)
+target_compile_options(core-tx-program PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-program PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ core-formats-arrow
+ ydb-core-protos
+ ydb-core-tablet_flat
+ yql-minikql-invoke_builtins
+ yql-minikql-comp_nodes
+ core-arrow_kernels-registry
+)
+target_sources(core-tx-program PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp
+)
diff --git a/ydb/core/tx/program/CMakeLists.txt b/ydb/core/tx/program/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/program/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/program/CMakeLists.windows-x86_64.txt b/ydb/core/tx/program/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..12bf05544f2
--- /dev/null
+++ b/ydb/core/tx/program/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-tx-program)
+target_compile_options(core-tx-program PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-tx-program PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ core-formats-arrow
+ ydb-core-protos
+ ydb-core-tablet_flat
+ yql-minikql-invoke_builtins
+ yql-minikql-comp_nodes
+ core-arrow_kernels-registry
+)
+target_sources(core-tx-program PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/program.cpp b/ydb/core/tx/program/program.cpp
index 1d693164cf0..d87ecbd01aa 100644
--- a/ydb/core/tx/columnshard/engines/reader/program.cpp
+++ b/ydb/core/tx/program/program.cpp
@@ -15,15 +15,23 @@ using EAggregate = NArrow::EAggregate;
using TAssign = NSsa::TAssign;
using TAggregateAssign = NSsa::TAggregateAssign;
-struct TContext {
+class TProgramBuilder {
const IColumnResolver& ColumnResolver;
- mutable THashMap<ui32, TString> Sources;
+ const TKernelsRegistry& KernelsRegistry;
mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants;
-
- explicit TContext(const IColumnResolver& columnResolver)
+ TString Error;
+public:
+ mutable THashMap<ui32, TString> Sources;
+
+ explicit TProgramBuilder(const IColumnResolver& columnResolver, const TKernelsRegistry& kernelsRegistry)
: ColumnResolver(columnResolver)
+ , KernelsRegistry(kernelsRegistry)
{}
+ const TString& GetErrorMessage() const {
+ return Error;
+ }
+private:
std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const {
ui32 columnId = column.GetId();
TString name = ColumnResolver.GetColumnName(columnId, false);
@@ -44,15 +52,28 @@ struct TContext {
}
return std::string(name.data(), name.size());
}
+ TAssign MakeFunction(const std::string& name,
+ const NKikimrSSA::TProgram::TAssignment::TFunction& func);
+ NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant);
+ NSsa::TAggregateAssign MakeAggregate(const std::string& name, const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func);
+ NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, const std::shared_ptr<arrow::RecordBatch>& parameterValues);
+
+public:
+ bool ExtractAssign(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign,
+ const std::shared_ptr<arrow::RecordBatch>& parameterValues);
+ bool ExtractFilter(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter);
+ bool ExtractProjection(NSsa::TProgramStep& step,
+ const NKikimrSSA::TProgram::TProjection& projection);
+ bool ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy);
};
-TAssign MakeFunction(const TContext& info, const std::string& name,
- const NKikimrSSA::TProgram::TAssignment::TFunction& func) {
+TAssign TProgramBuilder::MakeFunction(const std::string& name,
+ const NKikimrSSA::TProgram::TAssignment::TFunction& func) {
using TId = NKikimrSSA::TProgram::TAssignment;
std::vector<std::string> arguments;
for (auto& col : func.GetArguments()) {
- arguments.push_back(info.GetName(col));
+ arguments.push_back(GetName(col));
}
auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) {
@@ -63,10 +84,10 @@ TAssign MakeFunction(const TContext& info, const std::string& name,
};
auto mkLikeOptions = [&](bool ignoreCase) {
- if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) {
+ if (arguments.size() != 2 || !Constants.contains(arguments[1])) {
return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
}
- auto patternScalar = info.Constants[arguments[1]];
+ auto patternScalar = Constants[arguments[1]];
if (!arrow::is_base_binary_like(patternScalar->type->id())) {
return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
}
@@ -75,6 +96,15 @@ TAssign MakeFunction(const TContext& info, const std::string& name,
return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase);
};
+ if (func.GetFunctionType() == NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL) {
+ auto kernelFunction = KernelsRegistry.GetFunction(func.GetKernelIdx());
+ if (!kernelFunction) {
+ Error = TStringBuilder() << "Unknown kernel for " << name << ";kernel_idx=" << func.GetKernelIdx();
+ return TAssign(name, EOperation::Unspecified, std::move(arguments));
+ }
+ return TAssign(name, kernelFunction, std::move(arguments), nullptr);
+ }
+
switch (func.GetId()) {
case TId::FUNC_CMP_EQUAL:
return TAssign(name, EOperation::Equal, std::move(arguments));
@@ -152,37 +182,37 @@ TAssign MakeFunction(const TContext& info, const std::string& name,
return TAssign(name, EOperation::Divide, std::move(arguments));
case TId::FUNC_CAST_TO_INT8:
return TAssign(name, EOperation::CastInt8, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int8Type>()));
+ mkCastOptions(std::make_shared<arrow::Int8Type>()));
case TId::FUNC_CAST_TO_INT16:
return TAssign(name, EOperation::CastInt16, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int16Type>()));
+ mkCastOptions(std::make_shared<arrow::Int16Type>()));
case TId::FUNC_CAST_TO_INT32:
return TAssign(name, EOperation::CastInt32, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int32Type>()));
+ mkCastOptions(std::make_shared<arrow::Int32Type>()));
case TId::FUNC_CAST_TO_INT64:
return TAssign(name, EOperation::CastInt64, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int64Type>()));
+ mkCastOptions(std::make_shared<arrow::Int64Type>()));
case TId::FUNC_CAST_TO_UINT8:
return TAssign(name, EOperation::CastUInt8, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt8Type>()));
+ mkCastOptions(std::make_shared<arrow::UInt8Type>()));
case TId::FUNC_CAST_TO_UINT16:
return TAssign(name, EOperation::CastUInt16, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt16Type>()));
+ mkCastOptions(std::make_shared<arrow::UInt16Type>()));
case TId::FUNC_CAST_TO_UINT32:
return TAssign(name, EOperation::CastUInt32, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt32Type>()));
+ mkCastOptions(std::make_shared<arrow::UInt32Type>()));
case TId::FUNC_CAST_TO_UINT64:
return TAssign(name, EOperation::CastUInt64, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt64Type>()));
+ mkCastOptions(std::make_shared<arrow::UInt64Type>()));
case TId::FUNC_CAST_TO_FLOAT:
return TAssign(name, EOperation::CastFloat, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::FloatType>()));
+ mkCastOptions(std::make_shared<arrow::FloatType>()));
case TId::FUNC_CAST_TO_DOUBLE:
return TAssign(name, EOperation::CastDouble, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::DoubleType>()));
+ mkCastOptions(std::make_shared<arrow::DoubleType>()));
case TId::FUNC_CAST_TO_TIMESTAMP:
return TAssign(name, EOperation::CastTimestamp, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO)));
+ mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO)));
case TId::FUNC_CAST_TO_BINARY:
case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY:
case TId::FUNC_UNSPECIFIED:
@@ -191,7 +221,7 @@ TAssign MakeFunction(const TContext& info, const std::string& name,
return TAssign(name, EOperation::Unspecified, std::move(arguments));
}
-NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) {
+NSsa::TAssign TProgramBuilder::MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) {
using TId = NKikimrSSA::TProgram::TConstant;
switch (constant.GetValueCase()) {
@@ -225,13 +255,21 @@ NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::
return TAssign(name, EOperation::Unspecified, {});
}
-NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name,
- const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func)
-{
+NSsa::TAggregateAssign TProgramBuilder::MakeAggregate(const std::string& name, const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func) {
using TId = NKikimrSSA::TProgram::TAggregateAssignment;
+ if (func.GetFunctionType() == NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL) {
+ std::string argument = GetName(func.GetArguments()[0]);
+ auto kernelFunction = KernelsRegistry.GetFunction(func.GetKernelIdx());
+ if (!kernelFunction) {
+ Error = TStringBuilder() << "Unknown kernel for " << func.GetId() << ";kernel_idx=" << func.GetKernelIdx();
+ return TAggregateAssign(name);
+ }
+ return TAggregateAssign(name, kernelFunction, { argument });
+ }
+
if (func.ArgumentsSize() == 1) {
- std::string argument = info.GetName(func.GetArguments()[0]);
+ std::string argument = GetName(func.GetArguments()[0]);
switch (func.GetId()) {
case TId::AGG_SOME:
@@ -258,9 +296,7 @@ NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& na
return TAggregateAssign(name); // !ok()
}
-NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter,
- const std::shared_ptr<arrow::RecordBatch>& parameterValues)
-{
+NSsa::TAssign TProgramBuilder::MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, const std::shared_ptr<arrow::RecordBatch>& parameterValues) {
auto parameterName = parameter.GetName();
auto column = parameterValues->GetColumnByName(parameterName);
#if 0
@@ -280,17 +316,17 @@ NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TP
return TAssign(name, *column->GetScalar(0));
}
-bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign,
- const std::shared_ptr<arrow::RecordBatch>& parameterValues)
-{
+bool TProgramBuilder::ExtractAssign(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign,
+ const std::shared_ptr<arrow::RecordBatch>& parameterValues) {
+
using TId = NKikimrSSA::TProgram::TAssignment;
- std::string columnName = info.GetName(assign.GetColumn());
+ std::string columnName = GetName(assign.GetColumn());
switch (assign.GetExpressionCase()) {
case TId::kFunction:
{
- auto func = MakeFunction(info, columnName, assign.GetFunction());
+ auto func = MakeFunction(columnName, assign.GetFunction());
if (!func.IsOk()) {
return false;
}
@@ -303,7 +339,7 @@ bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimr
if (!cnst.IsConstant()) {
return false;
}
- info.Constants[columnName] = cnst.GetConstant();
+ Constants[columnName] = cnst.GetConstant();
step.Assignes.emplace_back(std::move(cnst));
break;
}
@@ -324,27 +360,27 @@ bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimr
return true;
}
-bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) {
+bool TProgramBuilder::ExtractFilter(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) {
auto& column = filter.GetPredicate();
if (!column.HasId() && !column.HasName()) {
return false;
}
// NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
- step.Filters.push_back(info.GetName(column));
+ step.Filters.push_back(GetName(column));
return true;
}
-bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step,
- const NKikimrSSA::TProgram::TProjection& projection) {
+bool TProgramBuilder::ExtractProjection(NSsa::TProgramStep& step,
+ const NKikimrSSA::TProgram::TProjection& projection) {
step.Projection.reserve(projection.ColumnsSize());
for (auto& col : projection.GetColumns()) {
// NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
- step.Projection.push_back(info.GetName(col));
+ step.Projection.push_back(GetName(col));
}
return true;
}
-bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) {
+bool TProgramBuilder::ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) {
if (!groupBy.AggregatesSize()) {
return false;
}
@@ -353,21 +389,20 @@ bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikim
step.GroupByKeys.reserve(groupBy.KeyColumnsSize());
for (auto& agg : groupBy.GetAggregates()) {
auto& resColumn = agg.GetColumn();
- TString columnName = info.GenerateName(resColumn);
+ TString columnName = GenerateName(resColumn);
- auto func = MakeAggregate(info, columnName, agg.GetFunction());
+ auto func = MakeAggregate(columnName, agg.GetFunction());
if (!func.IsOk()) {
return false;
}
step.GroupBy.push_back(std::move(func));
}
for (auto& key : groupBy.GetKeyColumns()) {
- step.GroupByKeys.push_back(info.GetName(key));
+ step.GroupByKeys.push_back(GetName(key));
}
return true;
}
-
}
const THashMap<ui32, TString>& TProgramContainer::GetSourceColumns() const {
@@ -444,41 +479,51 @@ bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchem
ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema);
}
+ if (olapProgramProto.HasKernels()) {
+ KernelsRegistry.Parse(olapProgramProto.GetKernels());
+ }
+
NOlap::TProgramContainer ssaProgram;
- if (!ParseProgram(columnResolver, programProto)) {
- error = TStringBuilder() << "Wrong olap program";
+ if (!ParseProgram(columnResolver, programProto, error)) {
+ if (!error) {
+ error = TStringBuilder() << "Wrong olap program";
+ }
return false;
}
return true;
}
-bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) {
+bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error) {
using TId = NKikimrSSA::TProgram::TCommand;
auto ssaProgram = std::make_shared<NSsa::TProgram>();
- TContext info(columnResolver);
+ TProgramBuilder programBuilder(columnResolver, KernelsRegistry);
auto step = std::make_shared<NSsa::TProgramStep>();
for (auto& cmd : program.GetCommand()) {
switch (cmd.GetLineCase()) {
case TId::kAssign:
- if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) {
+ if (!programBuilder.ExtractAssign(*step, cmd.GetAssign(), ProgramParameters)) {
+ error = programBuilder.GetErrorMessage();
return false;
}
break;
case TId::kFilter:
- if (!ExtractFilter(info, *step, cmd.GetFilter())) {
+ if (!programBuilder.ExtractFilter(*step, cmd.GetFilter())) {
+ error = programBuilder.GetErrorMessage();
return false;
}
break;
case TId::kProjection:
- if (!ExtractProjection(info, *step, cmd.GetProjection())) {
+ if (!programBuilder.ExtractProjection(*step, cmd.GetProjection())) {
+ error = programBuilder.GetErrorMessage();
return false;
}
ssaProgram->Steps.push_back(step);
step = std::make_shared<NSsa::TProgramStep>();
break;
case TId::kGroupBy:
- if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) {
+ if (!programBuilder.ExtractGroupBy(*step, cmd.GetGroupBy())) {
+ error = programBuilder.GetErrorMessage();
return false;
}
ssaProgram->Steps.push_back(step);
@@ -494,7 +539,7 @@ bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, cons
ssaProgram->Steps.push_back(step);
}
- ssaProgram->SourceColumns = std::move(info.Sources);
+ ssaProgram->SourceColumns = std::move(programBuilder.Sources);
// Query 'SELECT count(*) FROM table' needs a column
if (ssaProgram->SourceColumns.empty()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/program.h b/ydb/core/tx/program/program.h
index 494018def88..d3d562be5a1 100644
--- a/ydb/core/tx/columnshard/engines/reader/program.h
+++ b/ydb/core/tx/program/program.h
@@ -1,5 +1,6 @@
#pragma once
+#include "registry.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/ssa.pb.h>
#include <ydb/core/formats/arrow/program.h>
@@ -20,6 +21,7 @@ class TProgramContainer {
private:
std::shared_ptr<NSsa::TProgram> Program;
std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO
+ TKernelsRegistry KernelsRegistry;
public:
bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error);
@@ -38,7 +40,7 @@ public:
bool HasEarlyFilterOnly() const;
private:
- bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
+ bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error);
};
}
diff --git a/ydb/core/tx/program/registry.cpp b/ydb/core/tx/program/registry.cpp
new file mode 100644
index 00000000000..d7e31ff8c48
--- /dev/null
+++ b/ydb/core/tx/program/registry.cpp
@@ -0,0 +1,25 @@
+#include "registry.h"
+
+#include <ydb/library/yql/core/arrow_kernels/registry/registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+
+namespace NKikimr::NOlap {
+
+bool TKernelsRegistry::Parse(const TString& serialized) {
+ auto functionRegistry = NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry());
+ auto nodeFactory = NMiniKQL::GetBuiltinFactory();
+ auto kernels = NYql::LoadKernels(serialized, *functionRegistry, nodeFactory);
+ Kernels.swap(kernels);
+ for (const auto& kernel : Kernels) {
+ arrow::compute::Arity arity(kernel->signature->in_types().size(), kernel->signature->is_varargs());
+ auto func = std::make_shared<arrow::compute::ScalarFunction>("local_function", arity, nullptr);
+ auto res = func->AddKernel(*kernel);
+ if (!res.ok()) {
+ return false;
+ }
+ Functions.push_back(func);
+ }
+ return true;
+}
+}
diff --git a/ydb/core/tx/program/registry.h b/ydb/core/tx/program/registry.h
new file mode 100644
index 00000000000..ba24f343555
--- /dev/null
+++ b/ydb/core/tx/program/registry.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <ydb/core/formats/arrow/program.h>
+
+namespace NKikimr::NOlap {
+
+class TKernelsRegistry {
+public:
+ using TKernels = std::vector<std::shared_ptr<const arrow::compute::ScalarKernel>>;
+
+private:
+ TKernels Kernels;
+ std::vector<NSsa::TFunctionPtr> Functions;
+
+public:
+ bool Parse(const TString& serialized);
+
+ NSsa::TFunctionPtr GetFunction(const size_t index) const {
+ if (index <= Functions.size()) {
+ return Functions[index];
+ }
+ return nullptr;
+ }
+};
+
+}