aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTony-Romanov <150126326+Tony-Romanov@users.noreply.github.com>2024-02-01 09:11:35 +0100
committerGitHub <noreply@github.com>2024-02-01 09:11:35 +0100
commitf2e86bd36e482f79ff8f2f742f270cd21b6fd35d (patch)
tree00e21548385a126dfd80b0cd2c72b6a7e6928970
parent9cddf4b950e79f87332a27be97ef6f40a44e12dd (diff)
downloadydb-f2e86bd36e482f79ff8f2f742f270cd21b6fd35d.tar.gz
Remove dependncy on <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> (#1476)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp37
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h1
3 files changed, 16 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index f43e2ad845b..b3aa7fd98b6 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -6,7 +6,6 @@
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
-#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
#include <ydb/library/actors/core/log.h>
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 6090bba07f1..d2f4ace50a6 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -9,7 +9,7 @@
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
-#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
+#include <ydb/library/yql/public/udf/arrow/block_builder.h>
#include <ydb/library/actors/core/log.h>
@@ -52,31 +52,26 @@ std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo&
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
std::vector<std::shared_ptr<arrow::Field>> columns;
std::vector<std::shared_ptr<arrow::Array>> data;
- auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames;
- columns.reserve(parameterNames.size());
- data.reserve(parameterNames.size());
+ if (const auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; !parameterNames.empty()) {
+ columns.reserve(parameterNames.size());
+ data.reserve(parameterNames.size());
- for (auto& name : stage.GetProgramParameters()) {
- if (!parameterNames.contains(name)) {
- continue;
- }
-
- auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
- YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow");
-
- std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type);
- NYql::NArrow::AppendElement(value, builder.get(), type);
-
- std::shared_ptr<arrow::Array> array;
- auto status = builder->Finish(&array);
+ for (const auto& name : stage.GetProgramParameters()) {
+ if (!parameterNames.contains(name)) {
+ continue;
+ }
- YQL_ENSURE(status.ok(), "Failed to build arrow array of variables.");
+ const auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
+ const auto builder = NUdf::MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), type, *arrow::default_memory_pool(), 1U, nullptr);
+ builder->Add(value);
+ const auto datum = builder->Build(true);
- auto field = std::make_shared<arrow::Field>(name, array->type());
+ auto field = std::make_shared<arrow::Field>(name, datum.type());
- columns.emplace_back(std::move(field));
- data.emplace_back(std::move(array));
+ columns.emplace_back(std::move(field));
+ data.emplace_back(datum.make_array());
+ }
}
auto schema = std::make_shared<arrow::Schema>(std::move(columns));
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 1a68edf17d1..32afc63b983 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -254,7 +254,6 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn
ui64 txId, bool enableSpilling);
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings);
-void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings);
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling);