summaryrefslogtreecommitdiffstats
path: root/yql/essentials/parser/pg_wrapper/arrow.cpp
diff options
context:
space:
mode:
authormrlolthe1st <[email protected]>2024-12-25 16:36:31 +0300
committermrlolthe1st <[email protected]>2024-12-25 16:55:40 +0300
commite1cd64df2f5bf9ed2148cb2cc81bb9686751f102 (patch)
tree61b0b0305d36873c52bc56dc3f3ac1ac936977c3 /yql/essentials/parser/pg_wrapper/arrow.cpp
parentea2d93eb19aa7ed85fce5abe29427a9afd9d85ee (diff)
YQL-18548: BlockReader PG types + refactor
fix fix commit_hash:09713e813e389b07077111daed2ec31b59047bca
Diffstat (limited to 'yql/essentials/parser/pg_wrapper/arrow.cpp')
-rw-r--r--yql/essentials/parser/pg_wrapper/arrow.cpp553
1 files changed, 546 insertions, 7 deletions
diff --git a/yql/essentials/parser/pg_wrapper/arrow.cpp b/yql/essentials/parser/pg_wrapper/arrow.cpp
index 9bfe088b014..8c966e4f452 100644
--- a/yql/essentials/parser/pg_wrapper/arrow.cpp
+++ b/yql/essentials/parser/pg_wrapper/arrow.cpp
@@ -8,6 +8,7 @@
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/types/dynumber/dynumber.h>
#include <yql/essentials/public/decimal/yql_decimal.h>
+
#include <util/generic/singleton.h>
#include <arrow/compute/cast.h>
@@ -83,7 +84,7 @@ std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>&
size_t length = data->length;
NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
auto input = data->GetValues<ui8>(1, 0);
- builder.UnsafeReserve(length);
+ builder.UnsafeReserve(length);
auto output = builder.MutableData();
for (size_t i = 0; i < length; ++i) {
auto fullIndex = i + data->offset;
@@ -179,7 +180,7 @@ Numeric Uint64ToPgNumeric(ui64 value) {
Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) {
const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale);
Y_ENSURE(str);
- return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
+ return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1));
}
@@ -229,12 +230,12 @@ std::shared_ptr<arrow::Array> PgDecimal128ConvertNumeric(const std::shared_ptr<a
}
Numeric v = PgDecimal128ToNumeric(input[i], precision, scale, high_bits_mul);
-
+
auto datum = NumericGetDatum(v);
auto ptr = (char*)datum;
auto len = GetFullVarSize((const text*)datum);
NUdf::ZeroMemoryContext(ptr);
- ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
+ ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
}
std::shared_ptr<arrow::BinaryArray> ret;
@@ -254,7 +255,7 @@ Numeric PgDecimal128ToNumeric(arrow::Decimal128 value, int32_t precision, int32_
Numeric low_bits_res = int64_div_fast_to_numeric(low_bits, scale);
Numeric high_bits_res = numeric_mul_opt_error(int64_div_fast_to_numeric(high_bits, scale), high_bits_mul, &error);
MKQL_ENSURE(error == false, "Bad numeric multiplication.");
-
+
Numeric res = numeric_add_opt_error(high_bits_res, low_bits_res, &error);
MKQL_ENSURE(error == false, "Bad numeric addition.");
@@ -335,7 +336,7 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
}
case INT4OID: {
return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); });
- }
+ }
case INT8OID: {
return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); });
}
@@ -387,9 +388,547 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
} else {
return {};
}
- }
+ }
}
return {};
}
+class IYsonBlockReaderForPg : public IYsonComplexTypeReader {
+public:
+ virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0;
+ NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) {
+ char prev = buf.Current();
+ if (prev == NYson::NDetail::EntitySymbol) {
+ buf.Next();
+ return NUdf::TBlockItem();
+ }
+ if (prev == NYson::NDetail::BeginListSymbol) {
+ buf.Next();
+ YQL_ENSURE(buf.Current() == NYson::NDetail::EndListSymbol);
+ buf.Next();
+ return NUdf::TBlockItem();
+ }
+ return GetNotNull(buf);
+ }
+};
+
+
+NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std::vector<char>& tmp) {
+ if (desc.PassByValue) {
+ return NUdf::TBlockItem((ui64)datum);
+ }
+ auto typeLen = desc.TypeLen;
+ ui32 len;
+ if (typeLen == -1) {
+ len = GetFullVarSize((const text*)datum);
+ } else if (typeLen == -2) {
+ len = 1 + strlen((const char*)datum);
+ } else {
+ len = typeLen;
+ }
+ auto objlen = len;
+ len += sizeof(void*);
+ len = AlignUp<i32>(len, 8);
+ tmp.resize(len);
+ *(uint64_t*)tmp.data() = 0;
+ memcpy(tmp.data() + sizeof(void*), (const char*) datum, objlen);
+ return NUdf::TBlockItem(std::string_view(tmp.data(), len));
+}
+
+NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) {
+ NKikimr::NMiniKQL::TPAllocScope call;
+ StringInfoData stringInfo;
+ stringInfo.data = (char*)binary.Data();
+ stringInfo.len = binary.Size();
+ stringInfo.maxlen = binary.Size();
+ stringInfo.cursor = 0;
+
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ auto receiveFuncId = typeInfo.ReceiveFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
+ }
+
+ {
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(receiveFuncId);
+ fmgr_info(receiveFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)&stringInfo, false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ if (stringInfo.cursor != stringInfo.len) {
+ TStringBuilder errMsg;
+ errMsg << "Not all data has been consumed by 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", data size: " << stringInfo.len << ", consumed size: " << stringInfo.cursor;
+ UdfTerminate(errMsg.c_str());
+ }
+ return BlockItemFromDatum(x, typeInfo, tmp);
+ }
+}
+
+template<typename T>
+constexpr Datum FixedToDatum(T v) {
+ if constexpr (std::is_same_v<T, bool>) {
+ return BoolGetDatum(v);
+ } else if constexpr (std::is_same_v<T, i16>) {
+ return Int16GetDatum(v);
+ } else if constexpr (std::is_same_v<T, i32>) {
+ return Int32GetDatum(v);
+ } else if constexpr (std::is_same_v<T, i64>) {
+ return Int64GetDatum(v);
+ } else if constexpr (std::is_same_v<T, float>) {
+ return Float4GetDatum(v);
+ } else if constexpr (std::is_same_v<T, double>) {
+ return Float8GetDatum(v);
+ }
+}
+
+template<typename T>
+class TPgYsonFixedConverter final : public IYsonBlockReaderForPg {
+public:
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ Datum val;
+ if constexpr (std::is_same_v<T, bool>) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::FalseMarker || buf.Current() == NYson::NDetail::TrueMarker);
+ val = FixedToDatum<T>(buf.Current() == NYson::NDetail::TrueMarker);
+ buf.Next();
+ } else if constexpr (std::is_integral_v<T>) {
+ if constexpr (std::is_signed_v<T>) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::Int64Marker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.ReadVarI64());
+ } else {
+ Y_ENSURE(buf.Current() == NYson::NDetail::Uint64Marker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.ReadVarUI64());
+ }
+ } else {
+ Y_ENSURE(buf.Current() == NYson::NDetail::DoubleMarker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.NextDouble());
+ }
+ return NUdf::TBlockItem(val);
+ }
+};
+
+template<bool IsCString, bool FixedLength>
+class TPgYsonStringConverter final : public IYsonBlockReaderForPg {
+public:
+
+ TPgYsonStringConverter(i32 typeLen) : TypeLen_(typeLen) {
+ if (typeLen == -2) {
+ YQL_ENSURE(IsCString && !FixedLength);
+ } else if (typeLen == -1) {
+ YQL_ENSURE(!IsCString && !FixedLength);
+ } else {
+ YQL_ENSURE(typeLen >= 0 && FixedLength);
+ }
+ }
+
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
+ buf.Next();
+ const i32 originalLen = buf.ReadVarI32();
+ auto res = buf.Data();
+ buf.Skip(originalLen);
+
+ ui32 len;
+ if constexpr (IsCString) {
+ len = 1 + originalLen + sizeof(void*);
+ } else if constexpr (FixedLength) {
+ len = TypeLen_ + sizeof(void*);
+ } else {
+ len = VARHDRSZ + originalLen + sizeof(void*);
+ }
+
+ if (Tmp_.capacity() < len) {
+ Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
+ }
+ len = AlignUp<ui32>(len, 8);
+ Tmp_.resize(len);
+ if constexpr (IsCString) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else if constexpr (FixedLength) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else {
+ memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
+ UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
+ }
+ return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
+ }
+private:
+ std::vector<char> Tmp_;
+ i32 TypeLen_;
+};
+
+
+class TPgYsonOtherConverter : public IYsonBlockReaderForPg {
+public:
+ TPgYsonOtherConverter(Oid typeId) : TypeId_(typeId) {}
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ if (buf.Current() != NYson::NDetail::StringMarker) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
+ }
+ buf.Next();
+ const i32 len = buf.ReadVarI32();
+ auto ptr = buf.Data();
+ buf.Skip(len);
+ return PgBlockItemFromNativeBinary(TStringBuf(ptr, len), TypeId_, Tmp_);
+ }
+private:
+ Oid TypeId_;
+ std::vector<char> Tmp_;
+};
+
+
+template<typename T, arrow::Type::type Expected, typename ArrType>
+class TPgTopLevelFixedConverter : public IYtColumnConverter {
+public:
+ using Fn = Datum(*)(const T&);
+ TPgTopLevelFixedConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder) : Builder_(std::move(builder)) {}
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(Expected == valType->id());
+ return ConvertDict(data);
+ } else {
+ Y_ENSURE(Expected == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ ArrType arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ auto values = data->dictionary->GetValues<T>(0);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+};
+
+template<bool IsCString, bool FixedLength>
+class TPgTopLevelStringConverter : public IYtColumnConverter {
+public:
+ TPgTopLevelStringConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, i32 typeLen) : Builder_(std::move(builder)), TypeLen_(typeLen) {
+ if (typeLen == -2) {
+ YQL_ENSURE(IsCString && !FixedLength);
+ } else if (typeLen == -1) {
+ YQL_ENSURE(!IsCString && !FixedLength);
+ } else {
+ YQL_ENSURE(typeLen >= 0 && FixedLength);
+ }
+ }
+
+ constexpr NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t originalLen) {
+ ui32 len;
+ if constexpr (IsCString) {
+ len = 1 + originalLen + sizeof(void*);
+ } else if constexpr (FixedLength) {
+ len = TypeLen_ + sizeof(void*);
+ } else {
+ len = VARHDRSZ + originalLen + sizeof(void*);
+ }
+
+ if (Tmp_.capacity() < len) {
+ Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
+ }
+ len = AlignUp<ui32>(len, 8);
+ Tmp_.resize(len);
+ if constexpr (IsCString) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else if constexpr (FixedLength) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else {
+ memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
+ UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
+ }
+ return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
+ }
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
+ return ConvertDict(data);
+ } else {
+ if (arrow::Type::STRING == data->type->id()) {
+ auto res = arrow::compute::Cast(data, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data = res->array();
+ }
+ Y_ENSURE(arrow::Type::BINARY == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::BinaryArray arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(ConvertOnce(res, len));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(ConvertOnce(res, len));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ if (arrow::Type::STRING == data->dictionary->type->id()) {
+ auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data->dictionary = res->array();
+ }
+ arrow::BinaryArray arr(data->dictionary);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+ std::vector<char> Tmp_;
+ i32 TypeLen_;
+};
+
+class TPgTopLevelOtherConverter : public IYtColumnConverter {
+public:
+ TPgTopLevelOtherConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, Oid typeId) : Builder_(std::move(builder)), TypeId_(typeId) {}
+
+ inline NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t len) {
+ return PgBlockItemFromNativeBinary(TStringBuf(reinterpret_cast<const char*>(res), len), TypeId_, Tmp_);
+ }
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
+ return ConvertDict(data);
+ } else {
+ Y_ENSURE(arrow::Type::BINARY == data->type->id() || arrow::Type::STRING == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::BinaryArray arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ if (arrow::Type::STRING == data->dictionary->type->id()) {
+ auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data->dictionary = res->array();
+ }
+ arrow::BinaryArray arr(data->dictionary);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+ Oid TypeId_;
+ std::vector<char> Tmp_;
+};
+
+std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType) {
+ YQL_ENSURE(targetType);
+
+ switch (targetType->GetTypeId()) {
+ case BOOLOID: {
+ return std::make_unique<TPgTopLevelFixedConverter<bool, arrow::Type::BOOL, arrow::BooleanArray>>(std::move(builder));
+ }
+ case INT2OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i16, arrow::Type::INT16, arrow::Int16Array>>(std::move(builder));
+ }
+ case INT4OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i32, arrow::Type::INT32, arrow::Int32Array>>(std::move(builder));
+ }
+ case INT8OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i64, arrow::Type::INT64, arrow::Int64Array>>(std::move(builder));
+ }
+ case FLOAT4OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<float, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
+ }
+ case FLOAT8OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<double, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID:
+ case NAMEOID:
+ case CSTRINGOID: {
+ auto typeLen = NPg::LookupType(targetType->GetTypeId()).TypeLen;
+ if (typeLen == -2) {
+ return std::make_unique<TPgTopLevelStringConverter<true, false>>(std::move(builder), typeLen);
+ } else if (typeLen == -1) {
+ return std::make_unique<TPgTopLevelStringConverter<false, false>>(std::move(builder), typeLen);
+ } else {
+ return std::make_unique<TPgTopLevelStringConverter<false, true>>(std::move(builder), typeLen);
+ }
+ }
+ default:
+ return std::make_unique<TPgTopLevelOtherConverter>(std::move(builder), targetType->GetTypeId());
+ }
+}
+
+
+std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) {
+ switch (desc.TypeId) {
+ case BOOLOID: {
+ return std::make_unique<TPgYsonFixedConverter<bool>>();
+ }
+ case INT2OID: {
+ return std::make_unique<TPgYsonFixedConverter<i16>>();
+ }
+ case INT4OID: {
+ return std::make_unique<TPgYsonFixedConverter<i32>>();
+ }
+ case INT8OID: {
+ return std::make_unique<TPgYsonFixedConverter<i64>>();
+ }
+ case FLOAT4OID: {
+ return std::make_unique<TPgYsonFixedConverter<float>>();
+ }
+ case FLOAT8OID: {
+ return std::make_unique<TPgYsonFixedConverter<double>>();
+ }
+ case BYTEAOID:
+ case NAMEOID:
+ case VARCHAROID:
+ case TEXTOID:
+ case CSTRINGOID: {
+ auto typeLen = NPg::LookupType(desc.TypeId).TypeLen;
+ if (typeLen == -2) {
+ return std::make_unique<TPgYsonStringConverter<true, false>>(typeLen);
+ } else if (typeLen == -1) {
+ return std::make_unique<TPgYsonStringConverter<false, false>>(typeLen);
+ } else {
+ return std::make_unique<TPgYsonStringConverter<false, true>>(typeLen);
+ }
+ }
+ default:
+ return std::make_unique<TPgYsonOtherConverter>(desc.TypeId);
+ }
+}
+
}