diff options
author | Tony-Romanov <150126326+Tony-Romanov@users.noreply.github.com> | 2024-02-01 09:11:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-01 09:11:35 +0100 |
commit | f2e86bd36e482f79ff8f2f742f270cd21b6fd35d (patch) | |
tree | 00e21548385a126dfd80b0cd2c72b6a7e6928970 | |
parent | 9cddf4b950e79f87332a27be97ef6f40a44e12dd (diff) | |
download | ydb-f2e86bd36e482f79ff8f2f742f270cd21b6fd35d.tar.gz |
Remove dependncy on <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> (#1476)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 |
3 files changed, 16 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index f43e2ad845b..b3aa7fd98b6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -6,7 +6,6 @@ #include <ydb/public/api/protos/ydb_rate_limiter.pb.h> #include <ydb/library/yql/dq/runtime/dq_transport.h> -#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> #include <ydb/library/actors/core/log.h> diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 6090bba07f1..d2f4ace50a6 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -9,7 +9,7 @@ #include <ydb/core/tx/schemeshard/olap/schema/schema.h> #include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> +#include <ydb/library/yql/public/udf/arrow/block_builder.h> #include <ydb/library/actors/core/log.h> @@ -52,31 +52,26 @@ std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); std::vector<std::shared_ptr<arrow::Field>> columns; std::vector<std::shared_ptr<arrow::Array>> data; - auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; - columns.reserve(parameterNames.size()); - data.reserve(parameterNames.size()); + if (const auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; !parameterNames.empty()) { + columns.reserve(parameterNames.size()); + data.reserve(parameterNames.size()); - for (auto& name : stage.GetProgramParameters()) { - if (!parameterNames.contains(name)) { - continue; - } - - auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); - YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow"); - - std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type); - NYql::NArrow::AppendElement(value, builder.get(), type); - - std::shared_ptr<arrow::Array> array; - auto status = builder->Finish(&array); + for (const auto& name : stage.GetProgramParameters()) { + if (!parameterNames.contains(name)) { + continue; + } - YQL_ENSURE(status.ok(), "Failed to build arrow array of variables."); + const auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); + const auto builder = NUdf::MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), type, *arrow::default_memory_pool(), 1U, nullptr); + builder->Add(value); + const auto datum = builder->Build(true); - auto field = std::make_shared<arrow::Field>(name, array->type()); + auto field = std::make_shared<arrow::Field>(name, datum.type()); - columns.emplace_back(std::move(field)); - data.emplace_back(std::move(array)); + columns.emplace_back(std::move(field)); + data.emplace_back(datum.make_array()); + } } auto schema = std::make_shared<arrow::Schema>(std::move(columns)); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 1a68edf17d1..32afc63b983 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -254,7 +254,6 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn ui64 txId, bool enableSpilling); NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings); -void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings); void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta); void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling); |