diff options
author | nsofya <nsofya@yandex-team.com> | 2023-06-01 08:14:50 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-06-01 08:14:50 +0300 |
commit | 19137e1f7f120475065b3d180f55fd28694b0f9c (patch) | |
tree | 2738c1c4022dd524d096ff109b0adec58fd8e257 | |
parent | 2e82f547491f44bfa843325f9ce7c5e23b335878 (diff) | |
download | ydb-19137e1f7f120475065b3d180f55fd28694b0f9c.tar.gz |
Use kernels registry
Use kernels registry
Prepare program functions
33 files changed, 706 insertions, 213 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 6c1cfe875d4..528766a30a3 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -44,6 +44,109 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions { namespace NKikimr::NSsa { +template <class TAssignObject> +class TInternalFunction : public IStepFunction<TAssignObject> { + using TBase = IStepFunction<TAssignObject>; +public: + using TBase::TBase; + arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override { + auto arguments = TBase::BuildArgs(batch, assign.GetArguments()); + if (!arguments) { + return arrow::Status::Invalid("Error parsing args."); + } + auto funcNames = GetRegistryFunctionNames(assign.GetOperation()); + + arrow::Result<arrow::Datum> result = arrow::Status::UnknownError<std::string>("unknown function"); + for (const auto& funcName : funcNames) { + if (TBase::Ctx && TBase::Ctx->func_registry()->GetFunction(funcName).ok()) { + result = arrow::compute::CallFunction(funcName, *arguments, assign.GetOptions(), TBase::Ctx); + } else { + result = arrow::compute::CallFunction(funcName, *arguments, assign.GetOptions()); + } + if (result.ok()) { + return PrepareResult(std::move(*result), assign); + } + } + return result; + } +private: + virtual std::vector<std::string> GetRegistryFunctionNames(const typename TAssignObject::TOperationType& opId) const = 0; + virtual arrow::Result<arrow::Datum> PrepareResult(arrow::Datum&& datum, const TAssignObject& assign) const { + Y_UNUSED(assign); + return std::move(datum); + } +}; + +class TConstFunction : public IStepFunction<TAssign> { + using TBase = IStepFunction<TAssign>; +public: + using TBase::TBase; + arrow::Result<arrow::Datum> Call(const TAssign& assign, const TDatumBatch& batch) const override { + Y_UNUSED(batch); + return assign.GetConstant(); + } +}; + +class TAggregateFunction : public TInternalFunction<TAggregateAssign> { + using TBase = TInternalFunction<TAggregateAssign>; +private: + using TBase::TBase; + std::vector<std::string> GetRegistryFunctionNames(const EAggregate& opId) const override { + return { GetFunctionName(opId), GetHouseFunctionName(opId)}; + } + arrow::Result<arrow::Datum> PrepareResult(arrow::Datum&& datum, const TAggregateAssign& assign) const override { + if (!datum.is_scalar()) { + return arrow::Status::Invalid("Aggregate result is not a scalar."); + } + + if (datum.scalar()->type->id() == arrow::Type::STRUCT) { + auto op = assign.GetOperation(); + if (op == EAggregate::Min) { + const auto& minMax = datum.scalar_as<arrow::StructScalar>(); + return minMax.value[0]; + } else if (op == EAggregate::Max) { + const auto& minMax = datum.scalar_as<arrow::StructScalar>(); + return minMax.value[1]; + } else { + return arrow::Status::Invalid("Unexpected struct result for aggregate function."); + } + } + if (!datum.type()) { + return arrow::Status::Invalid("Aggregate result has no type."); + } + return std::move(datum); + } +}; + +class TSimpleFunction : public TInternalFunction<TAssign> { + using TBase = TInternalFunction<TAssign>; +private: + using TBase::TBase; + virtual std::vector<std::string> GetRegistryFunctionNames(const EOperation& opId) const override { + return { GetFunctionName(opId) }; + } +}; + +template <class TAssignObject> +class TKernelFunction : public IStepFunction<TAssignObject> { + using TBase = IStepFunction<TAssignObject>; + const TFunctionPtr Function; + +public: + TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx) + : TBase(ctx) + , Function(kernelsFunction) + {} + + arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override { + auto arguments = TBase::BuildArgs(batch, assign.GetArguments()); + if (!arguments) { + return arrow::Status::Invalid("Error parsing args."); + } + return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx); + } +}; + const char * GetFunctionName(EOperation op) { switch (op) { case EOperation::CastBoolean: @@ -338,64 +441,6 @@ CH::AggFunctionId GetHouseFunction(EAggregate op) { return CH::AggFunctionId::AGG_UNSPECIFIED; } - -template <bool houseFunction, typename TOpId, typename TOptions> -arrow::Result<arrow::Datum> CallFunctionById( - TOpId funcId, const std::vector<std::string>& args, - const TOptions* funcOpts, - const TProgramStep::TDatumBatch& batch, - arrow::compute::ExecContext* ctx) -{ - std::vector<arrow::Datum> arguments; - arguments.reserve(args.size()); - - for (auto& colName : args) { - auto column = batch.GetColumnByName(colName); - if (!column.ok()) { - return column.status(); - } - arguments.push_back(*column); - } - std::string funcName; - if constexpr (houseFunction) { - funcName = GetHouseFunctionName(funcId); - } else { - funcName = GetFunctionName(funcId); - } - - if (ctx && ctx->func_registry()->GetFunction(funcName).ok()) { - return arrow::compute::CallFunction(funcName, arguments, funcOpts, ctx); - } - return arrow::compute::CallFunction(funcName, arguments, funcOpts); -} - -arrow::Result<arrow::Datum> CallFunctionByAssign( - const TAssign& assign, - const TProgramStep::TDatumBatch& batch, - arrow::compute::ExecContext* ctx) -{ - return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(), - batch, ctx); -} - -arrow::Result<arrow::Datum> CallFunctionByAssign( - const TAggregateAssign& assign, - const TProgramStep::TDatumBatch& batch, - arrow::compute::ExecContext* ctx) -{ - return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), - batch, ctx); -} - -arrow::Result<arrow::Datum> CallHouseFunctionByAssign( - const TAggregateAssign& assign, - TProgramStep::TDatumBatch& batch, - arrow::compute::ExecContext* ctx) -{ - return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), - batch, ctx); -} - CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) { CH::GroupByOptions::Assign descr; descr.function = GetHouseFunction(assign.GetOperation()); @@ -411,10 +456,7 @@ CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) { } -arrow::Status TProgramStep::TDatumBatch::AddColumn( - const std::string& name, - arrow::Datum&& column) -{ +arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& column) { if (Schema->GetFieldIndex(name) != -1) { return arrow::Status::Invalid("Trying to add duplicate column '" + name + "'"); } @@ -432,7 +474,7 @@ arrow::Status TProgramStep::TDatumBatch::AddColumn( return arrow::Status::OK(); } -arrow::Result<arrow::Datum> TProgramStep::TDatumBatch::GetColumnByName(const std::string& name) const { +arrow::Result<arrow::Datum> TDatumBatch::GetColumnByName(const std::string& name) const { auto i = Schema->GetFieldIndex(name); if (i < 0) { return arrow::Status::Invalid("Not found column '" + name + "' or duplicate"); @@ -440,7 +482,7 @@ arrow::Result<arrow::Datum> TProgramStep::TDatumBatch::GetColumnByName(const std return Datums[i]; } -std::shared_ptr<arrow::RecordBatch> TProgramStep::TDatumBatch::ToRecordBatch() const { +std::shared_ptr<arrow::RecordBatch> TDatumBatch::ToRecordBatch() const { std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(Datums.size()); for (auto col : Datums) { @@ -457,8 +499,7 @@ std::shared_ptr<arrow::RecordBatch> TProgramStep::TDatumBatch::ToRecordBatch() c return arrow::RecordBatch::Make(Schema, Rows, columns); } -std::shared_ptr<TProgramStep::TDatumBatch> -TProgramStep::TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) { +std::shared_ptr<TDatumBatch> TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch) { std::vector<arrow::Datum> datums; datums.reserve(batch->num_columns()); for (int64_t i = 0; i < batch->num_columns(); ++i) { @@ -472,11 +513,26 @@ TProgramStep::TDatumBatch::FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& }); } +IStepFunction<TAssign>::TPtr TAssign::GetFunction(arrow::compute::ExecContext* ctx) const { + if (KernelFunction) { + return std::make_shared<TKernelFunction<TAssign>>(KernelFunction, ctx); + } + if (IsConstant()) { + return std::make_shared<TConstFunction>(ctx); + } + return std::make_shared<TSimpleFunction>(ctx); +} -arrow::Status TProgramStep::ApplyAssignes( - TProgramStep::TDatumBatch& batch, - arrow::compute::ExecContext* ctx) const -{ + +IStepFunction<TAggregateAssign>::TPtr TAggregateAssign::GetFunction(arrow::compute::ExecContext* ctx) const { + if (KernelFunction) { + return std::make_shared<TKernelFunction<TAggregateAssign>>(KernelFunction, ctx); + } + return std::make_shared<TAggregateFunction>(ctx); +} + + +arrow::Status TProgramStep::ApplyAssignes(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const { if (Assignes.empty()) { return arrow::Status::OK(); } @@ -486,29 +542,20 @@ arrow::Status TProgramStep::ApplyAssignes( return arrow::Status::Invalid("Assign to existing column '" + assign.GetName() + "'."); } - arrow::Datum column; - if (assign.IsConstant()) { - column = assign.GetConstant(); - } else { - auto funcResult = CallFunctionByAssign(assign, batch, ctx); - if (!funcResult.ok()) { - return funcResult.status(); - } - column = *funcResult; + auto funcResult = assign.GetFunction(ctx)->Call(assign, batch); + if (!funcResult.ok()) { + return funcResult.status(); } + arrow::Datum column = *funcResult; auto status = batch.AddColumn(assign.GetName(), std::move(column)); if (!status.ok()) { return status; } } - //return batch->Validate(); return arrow::Status::OK(); } -arrow::Status TProgramStep::ApplyAggregates( - TDatumBatch& batch, - arrow::compute::ExecContext* ctx) const -{ +arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const { if (GroupBy.empty()) { return arrow::Status::OK(); } @@ -522,40 +569,13 @@ arrow::Status TProgramStep::ApplyAggregates( if (GroupByKeys.empty()) { for (auto& assign : GroupBy) { - auto funcResult = CallFunctionByAssign(assign, batch, ctx); + auto funcResult = assign.GetFunction(ctx)->Call(assign, batch); if (!funcResult.ok()) { - auto houseResult = CallHouseFunctionByAssign(assign, batch, ctx); - if (!houseResult.ok()) { - return funcResult.status(); - } - funcResult = houseResult; + return funcResult.status(); } - res.Datums.push_back(*funcResult); - auto& column = res.Datums.back(); - if (!column.is_scalar()) { - return arrow::Status::Invalid("Aggregate result is not a scalar."); - } - - if (column.scalar()->type->id() == arrow::Type::STRUCT) { - auto op = assign.GetOperation(); - if (op == EAggregate::Min) { - const auto& minMax = column.scalar_as<arrow::StructScalar>(); - column = minMax.value[0]; - } else if (op == EAggregate::Max) { - const auto& minMax = column.scalar_as<arrow::StructScalar>(); - column = minMax.value[1]; - } else { - return arrow::Status::Invalid("Unexpected struct result for aggregate function."); - } - } - - if (!column.type()) { - return arrow::Status::Invalid("Aggregate result has no type."); - } - fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), column.type())); + fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), res.Datums.back().type())); } - res.Rows = 1; } else { CH::GroupByOptions funcOpts; diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index f01e35930dc..ed5c46d2cc2 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -29,6 +29,7 @@ namespace NKikimr::NSsa { using EOperation = NArrow::EOperation; using EAggregate = NArrow::EAggregate; +using TFunctionPtr = std::shared_ptr<arrow::compute::ScalarFunction>; const char * GetFunctionName(EOperation op); const char * GetFunctionName(EAggregate op); @@ -36,8 +37,52 @@ const char * GetHouseFunctionName(EAggregate op); inline const char * GetHouseGroupByName() { return "ch.group_by"; } EOperation ValidateOperation(EOperation op, ui32 argsSize); +struct TDatumBatch { + std::shared_ptr<arrow::Schema> Schema; + std::vector<arrow::Datum> Datums; + int64_t Rows{}; + + arrow::Status AddColumn(const std::string& name, arrow::Datum&& column); + arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const; + std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const; + static std::shared_ptr<TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch); +}; + +template <class TAssignObject> +class IStepFunction { + using TSelf = IStepFunction<TAssignObject>; +protected: + arrow::compute::ExecContext* Ctx; +public: + using TPtr = std::shared_ptr<TSelf>; + + IStepFunction(arrow::compute::ExecContext* ctx) + : Ctx(ctx) + {} + + virtual ~IStepFunction() {} + + virtual arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const = 0; + +protected: + std::optional<std::vector<arrow::Datum>> BuildArgs(const TDatumBatch& batch, const std::vector<std::string>& args) const { + std::vector<arrow::Datum> arguments; + arguments.reserve(args.size()); + for (auto& colName : args) { + auto column = batch.GetColumnByName(colName); + if (!column.ok()) { + return {}; + } + arguments.push_back(*column); + } + return std::move(arguments); + } +}; + class TAssign { public: + using TOperationType = EOperation; + TAssign(const std::string& name, EOperation op, std::vector<std::string>&& args) : Name(name) , Operation(ValidateOperation(op, args.size())) @@ -115,24 +160,38 @@ public: , FuncOpts(nullptr) {} + TAssign(const std::string& name, + TFunctionPtr kernelFunction, + std::vector<std::string>&& args, + std::shared_ptr<arrow::compute::FunctionOptions> funcOpts) + : Name(name) + , Arguments(std::move(args)) + , FuncOpts(funcOpts) + , KernelFunction(kernelFunction) + {} + bool IsConstant() const { return Operation == EOperation::Constant; } - bool IsOk() const { return Operation != EOperation::Unspecified; } + bool IsOk() const { return Operation != EOperation::Unspecified || !!KernelFunction; } EOperation GetOperation() const { return Operation; } const std::vector<std::string>& GetArguments() const { return Arguments; } std::shared_ptr<arrow::Scalar> GetConstant() const { return Constant; } const std::string& GetName() const { return Name; } - const arrow::compute::FunctionOptions* GetFunctionOptions() const { return FuncOpts.get(); } + const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); } + IStepFunction<TAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const; private: std::string Name; EOperation Operation{EOperation::Unspecified}; std::vector<std::string> Arguments; std::shared_ptr<arrow::Scalar> Constant; std::shared_ptr<arrow::compute::FunctionOptions> FuncOpts; + TFunctionPtr KernelFunction; }; class TAggregateAssign { public: + using TOperationType = EAggregate; + TAggregateAssign(const std::string& name, EAggregate op = EAggregate::Unspecified) : Name(name) , Operation(op) @@ -152,20 +211,32 @@ public: } } - bool IsOk() const { return Operation != EAggregate::Unspecified; } + TAggregateAssign(const std::string& name, + TFunctionPtr kernelFunction, + std::vector<std::string>&& args) + : Name(name) + , Arguments(std::move(args)) + , KernelFunction(kernelFunction) + {} + + bool IsOk() const { return Operation != EAggregate::Unspecified || !!KernelFunction; } EAggregate GetOperation() const { return Operation; } const std::vector<std::string>& GetArguments() const { return Arguments; } std::vector<std::string>& MutableArguments() { return Arguments; } const std::string& GetName() const { return Name; } - const arrow::compute::ScalarAggregateOptions& GetAggregateOptions() const { return ScalarOpts; } + const arrow::compute::ScalarAggregateOptions* GetOptions() const { return &ScalarOpts; } + + IStepFunction<TAggregateAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const; private: std::string Name; EAggregate Operation{EAggregate::Unspecified}; std::vector<std::string> Arguments; arrow::compute::ScalarAggregateOptions ScalarOpts; // TODO: make correct options + TFunctionPtr KernelFunction; }; + /// Group of commands that finishes with projection. Steps add locality for columns definition. /// /// In step we have non-decreasing count of columns (line to line) till projection. So columns are either source @@ -182,16 +253,7 @@ struct TProgramStep { std::vector<std::string> GroupByKeys; // TODO: it's possible to use them without GROUP BY for DISTINCT std::vector<std::string> Projection; // Step's result columns (remove others) - struct TDatumBatch { - std::shared_ptr<arrow::Schema> Schema; - std::vector<arrow::Datum> Datums; - int64_t Rows{}; - - arrow::Status AddColumn(const std::string& name, arrow::Datum&& column); - arrow::Result<arrow::Datum> GetColumnByName(const std::string& name) const; - std::shared_ptr<arrow::RecordBatch> ToRecordBatch() const; - static std::shared_ptr<TProgramStep::TDatumBatch> FromRecordBatch(std::shared_ptr<arrow::RecordBatch>& batch); - }; + using TDatumBatch = TDatumBatch; std::set<std::string> GetColumnsInUsage() const; @@ -207,7 +269,7 @@ struct TProgramStep { arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const; arrow::Status ApplyProjection(TDatumBatch& batch) const; - arrow::Status MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const; + arrow::Status MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const; }; struct TProgram { diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index eaa8891c41d..61d45469830 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -40,6 +40,11 @@ message TProgram { optional string Name = 1; } + enum EFunctionType { + SIMPLE_ARROW = 1; + YQL_KERNEL = 2; + } + message TAssignment { enum EFunction { FUNC_UNSPECIFIED = 0; @@ -85,6 +90,8 @@ message TProgram { message TFunction { optional uint32 Id = 1; // EFunction repeated TColumn Arguments = 2; + optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ]; + optional uint32 KernelIdx = 4; } message TExternalFunction { @@ -127,6 +134,8 @@ message TProgram { optional uint32 Id = 1; // EAggregateFunction repeated TColumn Arguments = 2; optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV + optional EFunctionType FunctionType = 4 [ default = SIMPLE_ARROW ]; + optional uint32 KernelIdx = 5; // TODO: Parameters, i.e. N for topK(N)(arg) } @@ -171,4 +180,5 @@ message TOlapProgram { // RecordBatch deserialization require arrow::Schema, thus store it here optional bytes ParametersSchema = 2; optional bytes Parameters = 3; + optional bytes Kernels = 4; } diff --git a/ydb/core/tx/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/CMakeLists.darwin-x86_64.txt index 3589716aac5..03fdff7248e 100644 --- a/ydb/core/tx/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) add_subdirectory(mediator) +add_subdirectory(program) add_subdirectory(replication) add_subdirectory(scheme_board) add_subdirectory(scheme_cache) diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt index d0bf8cc5773..3ed83b3f8ed 100644 --- a/ydb/core/tx/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) add_subdirectory(mediator) +add_subdirectory(program) add_subdirectory(replication) add_subdirectory(scheme_board) add_subdirectory(scheme_cache) diff --git a/ydb/core/tx/CMakeLists.linux-x86_64.txt b/ydb/core/tx/CMakeLists.linux-x86_64.txt index d0bf8cc5773..3ed83b3f8ed 100644 --- a/ydb/core/tx/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) add_subdirectory(mediator) +add_subdirectory(program) add_subdirectory(replication) add_subdirectory(scheme_board) add_subdirectory(scheme_cache) diff --git a/ydb/core/tx/CMakeLists.windows-x86_64.txt b/ydb/core/tx/CMakeLists.windows-x86_64.txt index 3589716aac5..03fdff7248e 100644 --- a/ydb/core/tx/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) add_subdirectory(mediator) +add_subdirectory(program) add_subdirectory(replication) add_subdirectory(scheme_board) add_subdirectory(scheme_cache) diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 9965189fe32..a80fba2971d 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -341,3 +341,26 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec } } + +namespace NKikimr::NColumnShard { + NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, + const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key) { + NOlap::TIndexInfo indexInfo = NOlap::TIndexInfo::BuildDefault(); + + for (ui32 i = 0; i < ydbSchema.size(); ++i) { + ui32 id = i + 1; + auto& name = ydbSchema[i].first; + auto& type = ydbSchema[i].second; + + indexInfo.Columns[id] = NTable::TColumn(name, id, type, ""); + indexInfo.ColumnNames[name] = id; + } + + for (const auto& [keyName, keyType] : key) { + indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); + } + + indexInfo.SetAllKeys(); + return indexInfo; + } +} diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index d257f9e2c40..49e114ff2c1 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -472,4 +472,7 @@ namespace NKikimr::NColumnShard { return arrow::RecordBatch::Make(Schema, RowsCount, columns); } }; + + NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, + const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key); } diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 8a29dbda949..b868fce25e7 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -35,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage formats-arrow-compression + core-tx-program udf-service-exception_policy tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index 43a5fa4870c..d73745d1174 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -36,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage formats-arrow-compression + core-tx-program udf-service-exception_policy tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index 43a5fa4870c..d73745d1174 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage formats-arrow-compression + core-tx-program udf-service-exception_policy tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 8a29dbda949..b868fce25e7 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -35,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC columnshard-engines-predicate columnshard-engines-storage formats-arrow-compression + core-tx-program udf-service-exception_policy tools-enum_parser-enum_serialization_runtime ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 099c560c72e..19f1228ad62 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + core-tx-program engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) @@ -40,7 +41,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 4c3c3a73226..78e360e108d 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + core-tx-program engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) @@ -41,7 +42,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 4c3c3a73226..78e360e108d 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + core-tx-program engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) @@ -41,7 +42,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 099c560c72e..19f1228ad62 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC ydb-core-protos core-formats-arrow columnshard-engines-predicate + core-tx-program engines-reader-order_control tools-enum_parser-enum_serialization_runtime ) @@ -40,7 +41,6 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h index b581055388f..75b6aa4f32f 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.h +++ b/ydb/core/tx/columnshard/engines/reader/description.h @@ -1,5 +1,5 @@ #pragma once -#include "program.h" +#include <ydb/core/tx/program/program.h> #include <ydb/core/tx/columnshard/engines/predicate/filter.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt index 33224314acf..d59807c727a 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC ydb-core-tablet_flat tx-columnshard-counters yql-sql-pg_dummy + core-arrow_kernels-request + core-testlib-default ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 @@ -37,6 +39,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt index 571d214b767..50f51e0ad27 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt @@ -26,6 +26,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC ydb-core-tablet_flat tx-columnshard-counters yql-sql-pg_dummy + core-arrow_kernels-request + core-testlib-default ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE -ldl @@ -40,6 +42,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt index 54cd8327409..47a14cdffdd 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt @@ -27,6 +27,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC ydb-core-tablet_flat tx-columnshard-counters yql-sql-pg_dummy + core-arrow_kernels-request + core-testlib-default ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE -ldl @@ -41,6 +43,8 @@ target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt index 091ad519d80..c2cce5aea17 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt @@ -26,10 +26,14 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC ydb-core-tablet_flat tx-columnshard-counters yql-sql-pg_dummy + core-arrow_kernels-request + core-testlib-default ) target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_insert_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/ut_program.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 71109ad4d4c..a3fdbdd9332 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -3,6 +3,8 @@ #include "predicate/predicate.h" #include "index_logic_logs.h" +#include <ydb/core/tx/columnshard/columnshard_ut_common.h> + namespace NKikimr { @@ -170,27 +172,6 @@ static const std::vector<std::pair<TString, TTypeInfo>> testKey = { {"uid", TTypeInfo(NTypeIds::Utf8) } }; -TIndexInfo TestTableInfo(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns, - const std::vector<std::pair<TString, TTypeInfo>>& key = testKey) { - TIndexInfo indexInfo = TIndexInfo::BuildDefault(); - - for (ui32 i = 0; i < ydbSchema.size(); ++i) { - ui32 id = i + 1; - auto& name = ydbSchema[i].first; - auto& type = ydbSchema[i].second; - - indexInfo.Columns[id] = NTable::TColumn(name, id, type, ""); - indexInfo.ColumnNames[name] = id; - } - - for (const auto& [keyName, keyType] : key) { - indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); - } - - indexInfo.SetAllKeys(); - return indexInfo; -} - template <typename TKeyDataType> class TBuilder { public: @@ -357,7 +338,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { void WriteLoadRead(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const std::vector<std::pair<TString, TTypeInfo>>& key) { TTestDbWrapper db; - TIndexInfo tableInfo = TestTableInfo(ydbSchema, key); + TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key); std::vector<ui64> paths = {1, 2}; @@ -453,7 +434,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { void ReadWithPredicates(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const std::vector<std::pair<TString, TTypeInfo>>& key) { TTestDbWrapper db; - TIndexInfo tableInfo = TestTableInfo(ydbSchema, key); + TIndexInfo tableInfo = NColumnShard::BuildTableInfo(ydbSchema, key); TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine(0, TestLimits()); @@ -552,7 +533,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { Y_UNIT_TEST(IndexWriteOverload) { TTestDbWrapper db; - TIndexInfo tableInfo = TestTableInfo(); + TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);; ui64 pathId = 1; ui32 step = 1000; @@ -634,7 +615,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { Y_UNIT_TEST(IndexTtl) { TTestDbWrapper db; - TIndexInfo tableInfo = TestTableInfo(); + TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);; ui64 pathId = 1; ui32 step = 1000; diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp new file mode 100644 index 00000000000..a347c2b1782 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/ut_program.cpp @@ -0,0 +1,145 @@ +#include "index_info.h" +#include "index_logic_logs.h" + +#include <ydb/core/tx/columnshard/columnshard__index_scan.h> +#include <ydb/core/tx/columnshard/columnshard_ut_common.h> +#include <ydb/core/tx/program/program.h> + +#include <ydb/library/yql/core/arrow_kernels/request/request.h> +#include <ydb/library/yql/core/arrow_kernels/registry/registry.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NKikimr::NOlap; +using namespace NKikimr::NColumnShard; +using namespace NKikimr; +namespace NTypeIds = NScheme::NTypeIds; +using TTypeId = NScheme::TTypeId; +using TTypeInfo = NScheme::TTypeInfo; + +namespace { + static const std::vector<std::pair<TString, TTypeInfo>> testColumns = { + {"timestamp", TTypeInfo(NTypeIds::Timestamp) }, + {"uid", TTypeInfo(NTypeIds::Utf8) }, + {"sum", TTypeInfo(NTypeIds::Int32) }, + {"vat", TTypeInfo(NTypeIds::Int32) }, + }; + + static const std::vector<std::pair<TString, TTypeInfo>> testKey = { + {"timestamp", TTypeInfo(NTypeIds::Timestamp) }, + {"uid", TTypeInfo(NTypeIds::Utf8) } + }; +} + +Y_UNIT_TEST_SUITE(TestProgram) { + + TString BuildRegistry() { + auto functionRegistry = CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()); + auto nodeFactory = NMiniKQL::GetBuiltinFactory(); + NYql::TKernelRequestBuilder b(*functionRegistry); + + NYql::TExprContext ctx; + auto blockInt32Type = ctx.template MakeType<NYql::TBlockExprType>(ctx.template MakeType<NYql::TDataExprType>(NYql::EDataSlot::Int32)); + auto index1 = b.AddBinaryOp(NYql::TKernelRequestBuilder::EBinaryOp::Add, blockInt32Type, blockInt32Type, blockInt32Type); + Y_UNUSED(index1); + return b.Serialize(); + } + + TString SerializeProgram(const NKikimrSSA::TProgram& programProto) { + NKikimrSSA::TOlapProgram olapProgramProto; + olapProgramProto.SetKernels(BuildRegistry()); + + { + TString str; + Y_PROTOBUF_SUPPRESS_NODISCARD programProto.SerializeToString(&str); + olapProgramProto.SetProgram(str); + } + TString programSerialized; + Y_PROTOBUF_SUPPRESS_NODISCARD olapProgramProto.SerializeToString(&programSerialized); + return programSerialized; + } + + Y_UNIT_TEST(YqlKernel) { + TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey); + TIndexColumnResolver columnResolver(indexInfo); + + NKikimrSSA::TProgram programProto; + { + auto* command = programProto.AddCommand(); + auto* functionProto = command->MutableAssign()->MutableFunction(); + functionProto->SetFunctionType(NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL); + functionProto->SetKernelIdx(0); + functionProto->AddArguments()->SetName("sum"); + functionProto->AddArguments()->SetName("vat"); + functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH); + } + { + auto* command = programProto.AddCommand(); + auto* prjectionProto = command->MutableProjection(); + auto* column = prjectionProto->AddColumns(); + column->SetName("0"); + } + const auto programSerialized = SerializeProgram(programProto); + + TProgramContainer program; + TString errors; + UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors); + + TTableUpdatesBuilder updates(NArrow::MakeArrowSchema({{"sum", TTypeInfo(NTypeIds::Int32) }, {"vat", TTypeInfo(NTypeIds::Int32) }})); + updates.AddRow().Add(1).Add(1); + updates.AddRow().Add(100).Add(0); + + auto batch = updates.BuildArrow(); + UNIT_ASSERT(program.ApplyProgram(batch).ok()); + + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Int32)) })); + result.AddRow().Add(2); + result.AddRow().Add(100); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } + + Y_UNIT_TEST(SimpleFunction) { + TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);; + TIndexColumnResolver columnResolver(indexInfo); + + NKikimrSSA::TProgram programProto; + { + auto* command = programProto.AddCommand(); + auto* functionProto = command->MutableAssign()->MutableFunction(); + auto* funcArg = functionProto->AddArguments(); + funcArg->SetName("uid"); + functionProto->SetId(NKikimrSSA::TProgram::TAssignment::EFunction::TProgram_TAssignment_EFunction_FUNC_STR_LENGTH); + } + { + auto* command = programProto.AddCommand(); + auto* prjectionProto = command->MutableProjection(); + auto* column = prjectionProto->AddColumns(); + column->SetName("0"); + } + const auto programSerialized = SerializeProgram(programProto); + + TProgramContainer program; + TString errors; + UNIT_ASSERT_C(program.Init(columnResolver, NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS, programSerialized, errors), errors); + + TTableUpdatesBuilder updates(NArrow::MakeArrowSchema( { std::make_pair("uid", TTypeInfo(NTypeIds::Utf8)) })); + updates.AddRow().Add("aaa"); + updates.AddRow().Add("b"); + updates.AddRow().Add(""); + + auto batch = updates.BuildArrow(); + UNIT_ASSERT(program.ApplyProgram(batch).ok()); + + TTableUpdatesBuilder result(NArrow::MakeArrowSchema( { std::make_pair("0", TTypeInfo(NTypeIds::Uint64)) })); + result.AddRow().Add<uint64_t>(3); + result.AddRow().Add<uint64_t>(1); + result.AddRow().Add<uint64_t>(0); + + auto expected = result.BuildArrow(); + UNIT_ASSERT_VALUES_EQUAL(batch->ToString(), expected->ToString()); + } +} diff --git a/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..12bf05544f2 --- /dev/null +++ b/ydb/core/tx/program/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-tx-program) +target_compile_options(core-tx-program PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-program PUBLIC + contrib-libs-cxxsupp + yutil + core-formats-arrow + ydb-core-protos + ydb-core-tablet_flat + yql-minikql-invoke_builtins + yql-minikql-comp_nodes + core-arrow_kernels-registry +) +target_sources(core-tx-program PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp +) diff --git a/ydb/core/tx/program/CMakeLists.linux-aarch64.txt b/ydb/core/tx/program/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..ce23a48278c --- /dev/null +++ b/ydb/core/tx/program/CMakeLists.linux-aarch64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-tx-program) +target_compile_options(core-tx-program PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-program PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + core-formats-arrow + ydb-core-protos + ydb-core-tablet_flat + yql-minikql-invoke_builtins + yql-minikql-comp_nodes + core-arrow_kernels-registry +) +target_sources(core-tx-program PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp +) diff --git a/ydb/core/tx/program/CMakeLists.linux-x86_64.txt b/ydb/core/tx/program/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..ce23a48278c --- /dev/null +++ b/ydb/core/tx/program/CMakeLists.linux-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-tx-program) +target_compile_options(core-tx-program PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-program PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + core-formats-arrow + ydb-core-protos + ydb-core-tablet_flat + yql-minikql-invoke_builtins + yql-minikql-comp_nodes + core-arrow_kernels-registry +) +target_sources(core-tx-program PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp +) diff --git a/ydb/core/tx/program/CMakeLists.txt b/ydb/core/tx/program/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/program/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/program/CMakeLists.windows-x86_64.txt b/ydb/core/tx/program/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..12bf05544f2 --- /dev/null +++ b/ydb/core/tx/program/CMakeLists.windows-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-tx-program) +target_compile_options(core-tx-program PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-tx-program PUBLIC + contrib-libs-cxxsupp + yutil + core-formats-arrow + ydb-core-protos + ydb-core-tablet_flat + yql-minikql-invoke_builtins + yql-minikql-comp_nodes + core-arrow_kernels-registry +) +target_sources(core-tx-program PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/registry.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/program/program.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/program.cpp b/ydb/core/tx/program/program.cpp index 1d693164cf0..d87ecbd01aa 100644 --- a/ydb/core/tx/columnshard/engines/reader/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -15,15 +15,23 @@ using EAggregate = NArrow::EAggregate; using TAssign = NSsa::TAssign; using TAggregateAssign = NSsa::TAggregateAssign; -struct TContext { +class TProgramBuilder { const IColumnResolver& ColumnResolver; - mutable THashMap<ui32, TString> Sources; + const TKernelsRegistry& KernelsRegistry; mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants; - - explicit TContext(const IColumnResolver& columnResolver) + TString Error; +public: + mutable THashMap<ui32, TString> Sources; + + explicit TProgramBuilder(const IColumnResolver& columnResolver, const TKernelsRegistry& kernelsRegistry) : ColumnResolver(columnResolver) + , KernelsRegistry(kernelsRegistry) {} + const TString& GetErrorMessage() const { + return Error; + } +private: std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const { ui32 columnId = column.GetId(); TString name = ColumnResolver.GetColumnName(columnId, false); @@ -44,15 +52,28 @@ struct TContext { } return std::string(name.data(), name.size()); } + TAssign MakeFunction(const std::string& name, + const NKikimrSSA::TProgram::TAssignment::TFunction& func); + NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant); + NSsa::TAggregateAssign MakeAggregate(const std::string& name, const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func); + NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, const std::shared_ptr<arrow::RecordBatch>& parameterValues); + +public: + bool ExtractAssign(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign, + const std::shared_ptr<arrow::RecordBatch>& parameterValues); + bool ExtractFilter(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter); + bool ExtractProjection(NSsa::TProgramStep& step, + const NKikimrSSA::TProgram::TProjection& projection); + bool ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy); }; -TAssign MakeFunction(const TContext& info, const std::string& name, - const NKikimrSSA::TProgram::TAssignment::TFunction& func) { +TAssign TProgramBuilder::MakeFunction(const std::string& name, + const NKikimrSSA::TProgram::TAssignment::TFunction& func) { using TId = NKikimrSSA::TProgram::TAssignment; std::vector<std::string> arguments; for (auto& col : func.GetArguments()) { - arguments.push_back(info.GetName(col)); + arguments.push_back(GetName(col)); } auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) { @@ -63,10 +84,10 @@ TAssign MakeFunction(const TContext& info, const std::string& name, }; auto mkLikeOptions = [&](bool ignoreCase) { - if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) { + if (arguments.size() != 2 || !Constants.contains(arguments[1])) { return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); } - auto patternScalar = info.Constants[arguments[1]]; + auto patternScalar = Constants[arguments[1]]; if (!arrow::is_base_binary_like(patternScalar->type->id())) { return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); } @@ -75,6 +96,15 @@ TAssign MakeFunction(const TContext& info, const std::string& name, return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase); }; + if (func.GetFunctionType() == NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL) { + auto kernelFunction = KernelsRegistry.GetFunction(func.GetKernelIdx()); + if (!kernelFunction) { + Error = TStringBuilder() << "Unknown kernel for " << name << ";kernel_idx=" << func.GetKernelIdx(); + return TAssign(name, EOperation::Unspecified, std::move(arguments)); + } + return TAssign(name, kernelFunction, std::move(arguments), nullptr); + } + switch (func.GetId()) { case TId::FUNC_CMP_EQUAL: return TAssign(name, EOperation::Equal, std::move(arguments)); @@ -152,37 +182,37 @@ TAssign MakeFunction(const TContext& info, const std::string& name, return TAssign(name, EOperation::Divide, std::move(arguments)); case TId::FUNC_CAST_TO_INT8: return TAssign(name, EOperation::CastInt8, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int8Type>())); + mkCastOptions(std::make_shared<arrow::Int8Type>())); case TId::FUNC_CAST_TO_INT16: return TAssign(name, EOperation::CastInt16, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int16Type>())); + mkCastOptions(std::make_shared<arrow::Int16Type>())); case TId::FUNC_CAST_TO_INT32: return TAssign(name, EOperation::CastInt32, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int32Type>())); + mkCastOptions(std::make_shared<arrow::Int32Type>())); case TId::FUNC_CAST_TO_INT64: return TAssign(name, EOperation::CastInt64, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int64Type>())); + mkCastOptions(std::make_shared<arrow::Int64Type>())); case TId::FUNC_CAST_TO_UINT8: return TAssign(name, EOperation::CastUInt8, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt8Type>())); + mkCastOptions(std::make_shared<arrow::UInt8Type>())); case TId::FUNC_CAST_TO_UINT16: return TAssign(name, EOperation::CastUInt16, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt16Type>())); + mkCastOptions(std::make_shared<arrow::UInt16Type>())); case TId::FUNC_CAST_TO_UINT32: return TAssign(name, EOperation::CastUInt32, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt32Type>())); + mkCastOptions(std::make_shared<arrow::UInt32Type>())); case TId::FUNC_CAST_TO_UINT64: return TAssign(name, EOperation::CastUInt64, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt64Type>())); + mkCastOptions(std::make_shared<arrow::UInt64Type>())); case TId::FUNC_CAST_TO_FLOAT: return TAssign(name, EOperation::CastFloat, std::move(arguments), - mkCastOptions(std::make_shared<arrow::FloatType>())); + mkCastOptions(std::make_shared<arrow::FloatType>())); case TId::FUNC_CAST_TO_DOUBLE: return TAssign(name, EOperation::CastDouble, std::move(arguments), - mkCastOptions(std::make_shared<arrow::DoubleType>())); + mkCastOptions(std::make_shared<arrow::DoubleType>())); case TId::FUNC_CAST_TO_TIMESTAMP: return TAssign(name, EOperation::CastTimestamp, std::move(arguments), - mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO))); + mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO))); case TId::FUNC_CAST_TO_BINARY: case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY: case TId::FUNC_UNSPECIFIED: @@ -191,7 +221,7 @@ TAssign MakeFunction(const TContext& info, const std::string& name, return TAssign(name, EOperation::Unspecified, std::move(arguments)); } -NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) { +NSsa::TAssign TProgramBuilder::MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) { using TId = NKikimrSSA::TProgram::TConstant; switch (constant.GetValueCase()) { @@ -225,13 +255,21 @@ NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram:: return TAssign(name, EOperation::Unspecified, {}); } -NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name, - const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func) -{ +NSsa::TAggregateAssign TProgramBuilder::MakeAggregate(const std::string& name, const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func) { using TId = NKikimrSSA::TProgram::TAggregateAssignment; + if (func.GetFunctionType() == NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL) { + std::string argument = GetName(func.GetArguments()[0]); + auto kernelFunction = KernelsRegistry.GetFunction(func.GetKernelIdx()); + if (!kernelFunction) { + Error = TStringBuilder() << "Unknown kernel for " << func.GetId() << ";kernel_idx=" << func.GetKernelIdx(); + return TAggregateAssign(name); + } + return TAggregateAssign(name, kernelFunction, { argument }); + } + if (func.ArgumentsSize() == 1) { - std::string argument = info.GetName(func.GetArguments()[0]); + std::string argument = GetName(func.GetArguments()[0]); switch (func.GetId()) { case TId::AGG_SOME: @@ -258,9 +296,7 @@ NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& na return TAggregateAssign(name); // !ok() } -NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, - const std::shared_ptr<arrow::RecordBatch>& parameterValues) -{ +NSsa::TAssign TProgramBuilder::MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, const std::shared_ptr<arrow::RecordBatch>& parameterValues) { auto parameterName = parameter.GetName(); auto column = parameterValues->GetColumnByName(parameterName); #if 0 @@ -280,17 +316,17 @@ NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TP return TAssign(name, *column->GetScalar(0)); } -bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign, - const std::shared_ptr<arrow::RecordBatch>& parameterValues) -{ +bool TProgramBuilder::ExtractAssign(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign, + const std::shared_ptr<arrow::RecordBatch>& parameterValues) { + using TId = NKikimrSSA::TProgram::TAssignment; - std::string columnName = info.GetName(assign.GetColumn()); + std::string columnName = GetName(assign.GetColumn()); switch (assign.GetExpressionCase()) { case TId::kFunction: { - auto func = MakeFunction(info, columnName, assign.GetFunction()); + auto func = MakeFunction(columnName, assign.GetFunction()); if (!func.IsOk()) { return false; } @@ -303,7 +339,7 @@ bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimr if (!cnst.IsConstant()) { return false; } - info.Constants[columnName] = cnst.GetConstant(); + Constants[columnName] = cnst.GetConstant(); step.Assignes.emplace_back(std::move(cnst)); break; } @@ -324,27 +360,27 @@ bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimr return true; } -bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) { +bool TProgramBuilder::ExtractFilter(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) { auto& column = filter.GetPredicate(); if (!column.HasId() && !column.HasName()) { return false; } // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. - step.Filters.push_back(info.GetName(column)); + step.Filters.push_back(GetName(column)); return true; } -bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step, - const NKikimrSSA::TProgram::TProjection& projection) { +bool TProgramBuilder::ExtractProjection(NSsa::TProgramStep& step, + const NKikimrSSA::TProgram::TProjection& projection) { step.Projection.reserve(projection.ColumnsSize()); for (auto& col : projection.GetColumns()) { // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. - step.Projection.push_back(info.GetName(col)); + step.Projection.push_back(GetName(col)); } return true; } -bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) { +bool TProgramBuilder::ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) { if (!groupBy.AggregatesSize()) { return false; } @@ -353,21 +389,20 @@ bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikim step.GroupByKeys.reserve(groupBy.KeyColumnsSize()); for (auto& agg : groupBy.GetAggregates()) { auto& resColumn = agg.GetColumn(); - TString columnName = info.GenerateName(resColumn); + TString columnName = GenerateName(resColumn); - auto func = MakeAggregate(info, columnName, agg.GetFunction()); + auto func = MakeAggregate(columnName, agg.GetFunction()); if (!func.IsOk()) { return false; } step.GroupBy.push_back(std::move(func)); } for (auto& key : groupBy.GetKeyColumns()) { - step.GroupByKeys.push_back(info.GetName(key)); + step.GroupByKeys.push_back(GetName(key)); } return true; } - } const THashMap<ui32, TString>& TProgramContainer::GetSourceColumns() const { @@ -444,41 +479,51 @@ bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchem ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema); } + if (olapProgramProto.HasKernels()) { + KernelsRegistry.Parse(olapProgramProto.GetKernels()); + } + NOlap::TProgramContainer ssaProgram; - if (!ParseProgram(columnResolver, programProto)) { - error = TStringBuilder() << "Wrong olap program"; + if (!ParseProgram(columnResolver, programProto, error)) { + if (!error) { + error = TStringBuilder() << "Wrong olap program"; + } return false; } return true; } -bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) { +bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error) { using TId = NKikimrSSA::TProgram::TCommand; auto ssaProgram = std::make_shared<NSsa::TProgram>(); - TContext info(columnResolver); + TProgramBuilder programBuilder(columnResolver, KernelsRegistry); auto step = std::make_shared<NSsa::TProgramStep>(); for (auto& cmd : program.GetCommand()) { switch (cmd.GetLineCase()) { case TId::kAssign: - if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) { + if (!programBuilder.ExtractAssign(*step, cmd.GetAssign(), ProgramParameters)) { + error = programBuilder.GetErrorMessage(); return false; } break; case TId::kFilter: - if (!ExtractFilter(info, *step, cmd.GetFilter())) { + if (!programBuilder.ExtractFilter(*step, cmd.GetFilter())) { + error = programBuilder.GetErrorMessage(); return false; } break; case TId::kProjection: - if (!ExtractProjection(info, *step, cmd.GetProjection())) { + if (!programBuilder.ExtractProjection(*step, cmd.GetProjection())) { + error = programBuilder.GetErrorMessage(); return false; } ssaProgram->Steps.push_back(step); step = std::make_shared<NSsa::TProgramStep>(); break; case TId::kGroupBy: - if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) { + if (!programBuilder.ExtractGroupBy(*step, cmd.GetGroupBy())) { + error = programBuilder.GetErrorMessage(); return false; } ssaProgram->Steps.push_back(step); @@ -494,7 +539,7 @@ bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, cons ssaProgram->Steps.push_back(step); } - ssaProgram->SourceColumns = std::move(info.Sources); + ssaProgram->SourceColumns = std::move(programBuilder.Sources); // Query 'SELECT count(*) FROM table' needs a column if (ssaProgram->SourceColumns.empty()) { diff --git a/ydb/core/tx/columnshard/engines/reader/program.h b/ydb/core/tx/program/program.h index 494018def88..d3d562be5a1 100644 --- a/ydb/core/tx/columnshard/engines/reader/program.h +++ b/ydb/core/tx/program/program.h @@ -1,5 +1,6 @@ #pragma once +#include "registry.h" #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/ssa.pb.h> #include <ydb/core/formats/arrow/program.h> @@ -20,6 +21,7 @@ class TProgramContainer { private: std::shared_ptr<NSsa::TProgram> Program; std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO + TKernelsRegistry KernelsRegistry; public: bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error); @@ -38,7 +40,7 @@ public: bool HasEarlyFilterOnly() const; private: - bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); + bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error); }; } diff --git a/ydb/core/tx/program/registry.cpp b/ydb/core/tx/program/registry.cpp new file mode 100644 index 00000000000..d7e31ff8c48 --- /dev/null +++ b/ydb/core/tx/program/registry.cpp @@ -0,0 +1,25 @@ +#include "registry.h" + +#include <ydb/library/yql/core/arrow_kernels/registry/registry.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> + +namespace NKikimr::NOlap { + +bool TKernelsRegistry::Parse(const TString& serialized) { + auto functionRegistry = NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()); + auto nodeFactory = NMiniKQL::GetBuiltinFactory(); + auto kernels = NYql::LoadKernels(serialized, *functionRegistry, nodeFactory); + Kernels.swap(kernels); + for (const auto& kernel : Kernels) { + arrow::compute::Arity arity(kernel->signature->in_types().size(), kernel->signature->is_varargs()); + auto func = std::make_shared<arrow::compute::ScalarFunction>("local_function", arity, nullptr); + auto res = func->AddKernel(*kernel); + if (!res.ok()) { + return false; + } + Functions.push_back(func); + } + return true; +} +} diff --git a/ydb/core/tx/program/registry.h b/ydb/core/tx/program/registry.h new file mode 100644 index 00000000000..ba24f343555 --- /dev/null +++ b/ydb/core/tx/program/registry.h @@ -0,0 +1,26 @@ +#pragma once + +#include <ydb/core/formats/arrow/program.h> + +namespace NKikimr::NOlap { + +class TKernelsRegistry { +public: + using TKernels = std::vector<std::shared_ptr<const arrow::compute::ScalarKernel>>; + +private: + TKernels Kernels; + std::vector<NSsa::TFunctionPtr> Functions; + +public: + bool Parse(const TString& serialized); + + NSsa::TFunctionPtr GetFunction(const size_t index) const { + if (index <= Functions.size()) { + return Functions[index]; + } + return nullptr; + } +}; + +} |