diff options
author | aneporada <aneporada@ydb.tech> | 2023-07-11 17:38:15 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-07-11 17:38:15 +0300 |
commit | 1f002d7c846bdae1c5564d7d05ed46aebbef00c8 (patch) | |
tree | d9208e7f40e6836315115e0068d13183dc83ea6f | |
parent | b1c129f96ce0aacaca34fd7bdc813bcac6f32f58 (diff) | |
download | ydb-1f002d7c846bdae1c5564d7d05ed46aebbef00c8.tar.gz |
Support ConvertScalar() from TBlockItem
5 files changed, 60 insertions, 17 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp index ed7637addf..55d1d4a1d7 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp @@ -14,7 +14,10 @@ namespace NKikimr::NMiniKQL { -arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) { +namespace { + +template<typename T> +arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& pool) { if (!value) { std::shared_ptr<arrow::DataType> arrowType; MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar"); @@ -32,7 +35,7 @@ arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arr std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i), pool).scalar()); + arrowValue.emplace_back(DoConvertScalar(tupleType->GetElementType(i), value.GetElement(i), pool).scalar()); } return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType)); @@ -42,30 +45,30 @@ arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arr auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); switch (slot) { case NUdf::EDataSlot::Int8: - return arrow::Datum(static_cast<int8_t>(value.Get<i8>())); + return arrow::Datum(static_cast<int8_t>(value.template Get<i8>())); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - return arrow::Datum(static_cast<uint8_t>(value.Get<ui8>())); + return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>())); case NUdf::EDataSlot::Int16: - return arrow::Datum(static_cast<int16_t>(value.Get<i16>())); + return arrow::Datum(static_cast<int16_t>(value.template Get<i16>())); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return arrow::Datum(static_cast<uint16_t>(value.Get<ui16>())); + return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>())); case NUdf::EDataSlot::Int32: - return arrow::Datum(static_cast<int32_t>(value.Get<i32>())); + return arrow::Datum(static_cast<int32_t>(value.template Get<i32>())); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return arrow::Datum(static_cast<uint32_t>(value.Get<ui32>())); + return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>())); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - return arrow::Datum(static_cast<int64_t>(value.Get<i64>())); + return arrow::Datum(static_cast<int64_t>(value.template Get<i64>())); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return arrow::Datum(static_cast<uint64_t>(value.Get<ui64>())); + return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>())); case NUdf::EDataSlot::Float: - return arrow::Datum(static_cast<float>(value.Get<float>())); + return arrow::Datum(static_cast<float>(value.template Get<float>())); case NUdf::EDataSlot::Double: - return arrow::Datum(static_cast<double>(value.Get<double>())); + return arrow::Datum(static_cast<double>(value.template Get<double>())); case NUdf::EDataSlot::String: case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Yson: @@ -90,6 +93,16 @@ arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arr MKQL_ENSURE(false, "Unsupported type"); } +} // namespace + +arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) { + return DoConvertScalar(type, value, pool); +} + +arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool) { + return DoConvertScalar(type, value, pool); +} + arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool) { MKQL_ENSURE(len > 0, "Invalid block size"); auto reader = MakeBlockReader(TTypeInfoHelper(), type); diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.h b/ydb/library/yql/minikql/computation/mkql_block_impl.h index 0d6b43580e..a0cdcd73e1 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.h +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.h @@ -4,6 +4,7 @@ #include "mkql_computation_node_holders.h" #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/public/udf/arrow/block_item.h> #include <arrow/array.h> #include <arrow/scalar.h> @@ -13,6 +14,7 @@ namespace NKikimr::NMiniKQL { arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool); +arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool); arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool); arrow::ValueDescr ToValueDescr(TType* type); diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 678ad34d92..1848969b54 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -2449,15 +2449,16 @@ extern "C" void WriteSkiffPgValue(TPgType* type, const NUdf::TUnboxedValuePod& v } // namespace NCommon -arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) { - const auto& desc = NPg::LookupType(type->GetTypeId()); +namespace { + +template<typename TScalarGetter, typename TPointerGetter> +arrow::Datum DoMakePgScalar(const NPg::TTypeDesc& desc, arrow::MemoryPool& pool, const TScalarGetter& getScalar, const TPointerGetter& getPtr) { if (desc.PassByValue) { - return arrow::MakeScalar((uint64_t)ScalarDatumFromPod(value)); + return arrow::MakeScalar(getScalar()); } else { - auto ptr = (const char*)PointerDatumFromPod(value); + const char* ptr = getPtr(); ui32 size; if (desc.TypeLen == -1) { - auto ptr = (const text*)PointerDatumFromPod(value); size = GetCleanVarSize((const text*)ptr) + VARHDRSZ; } else { size = strlen(ptr) + 1; @@ -2470,6 +2471,24 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf: } } +} // namespace + +arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) { + return DoMakePgScalar( + NPg::LookupType(type->GetTypeId()), pool, + [&value]() { return (uint64_t)ScalarDatumFromPod(value); }, + [&value]() { return (const char*)PointerDatumFromPod(value); } + ); +} + +arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool) { + return DoMakePgScalar( + NPg::LookupType(type->GetTypeId()), pool, + [&value]() { return (uint64_t)ScalarDatumFromItem(value); }, + [&value]() { return (const char*)PointerDatumFromItem(value); } + ); +} + TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) { switch (slot) { case NUdf::EDataSlot::Bool: diff --git a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h index 2c42f33992..551bb9cdb4 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h @@ -1,10 +1,12 @@ #pragma once #include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/public/udf/arrow/block_item.h> #include <arrow/datum.h> namespace NYql { arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool); +arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool); using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>; TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 5560845714..228adea359 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -247,6 +247,13 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf: return arrow::Datum(); } +arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool) { + Y_UNUSED(type); + Y_UNUSED(value); + Y_UNUSED(pool); + return arrow::Datum(); +} + TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) { Y_UNUSED(originalType); Y_UNUSED(targetType); |