summaryrefslogtreecommitdiffstats
path: root/yql
diff options
context:
space:
mode:
authormrlolthe1st <[email protected]>2024-12-25 16:36:31 +0300
committermrlolthe1st <[email protected]>2024-12-25 16:55:40 +0300
commite1cd64df2f5bf9ed2148cb2cc81bb9686751f102 (patch)
tree61b0b0305d36873c52bc56dc3f3ac1ac936977c3 /yql
parentea2d93eb19aa7ed85fce5abe29427a9afd9d85ee (diff)
YQL-18548: BlockReader PG types + refactor
fix fix commit_hash:09713e813e389b07077111daed2ec31b59047bca
Diffstat (limited to 'yql')
-rw-r--r--yql/essentials/parser/pg_wrapper/arrow.cpp553
-rw-r--r--yql/essentials/parser/pg_wrapper/interface/arrow.h10
-rw-r--r--yql/essentials/parser/pg_wrapper/interface/ya.make1
-rw-r--r--yql/essentials/providers/common/codec/ya.make1
-rw-r--r--yql/essentials/providers/common/codec/yt_arrow_converter_interface/ya.make10
-rw-r--r--yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter.h13
-rw-r--r--yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter_details.h100
-rw-r--r--yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp10
8 files changed, 690 insertions, 8 deletions
diff --git a/yql/essentials/parser/pg_wrapper/arrow.cpp b/yql/essentials/parser/pg_wrapper/arrow.cpp
index 9bfe088b014..8c966e4f452 100644
--- a/yql/essentials/parser/pg_wrapper/arrow.cpp
+++ b/yql/essentials/parser/pg_wrapper/arrow.cpp
@@ -8,6 +8,7 @@
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/types/dynumber/dynumber.h>
#include <yql/essentials/public/decimal/yql_decimal.h>
+
#include <util/generic/singleton.h>
#include <arrow/compute/cast.h>
@@ -83,7 +84,7 @@ std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>&
size_t length = data->length;
NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
auto input = data->GetValues<ui8>(1, 0);
- builder.UnsafeReserve(length);
+ builder.UnsafeReserve(length);
auto output = builder.MutableData();
for (size_t i = 0; i < length; ++i) {
auto fullIndex = i + data->offset;
@@ -179,7 +180,7 @@ Numeric Uint64ToPgNumeric(ui64 value) {
Numeric DecimalToPgNumeric(const NUdf::TUnboxedValuePod& value, ui8 precision, ui8 scale) {
const auto str = NYql::NDecimal::ToString(value.GetInt128(), precision, scale);
Y_ENSURE(str);
- return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
+ return (Numeric)DirectFunctionCall3Coll(numeric_in, DEFAULT_COLLATION_OID,
PointerGetDatum(str), Int32GetDatum(0), Int32GetDatum(-1));
}
@@ -229,12 +230,12 @@ std::shared_ptr<arrow::Array> PgDecimal128ConvertNumeric(const std::shared_ptr<a
}
Numeric v = PgDecimal128ToNumeric(input[i], precision, scale, high_bits_mul);
-
+
auto datum = NumericGetDatum(v);
auto ptr = (char*)datum;
auto len = GetFullVarSize((const text*)datum);
NUdf::ZeroMemoryContext(ptr);
- ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
+ ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
}
std::shared_ptr<arrow::BinaryArray> ret;
@@ -254,7 +255,7 @@ Numeric PgDecimal128ToNumeric(arrow::Decimal128 value, int32_t precision, int32_
Numeric low_bits_res = int64_div_fast_to_numeric(low_bits, scale);
Numeric high_bits_res = numeric_mul_opt_error(int64_div_fast_to_numeric(high_bits, scale), high_bits_mul, &error);
MKQL_ENSURE(error == false, "Bad numeric multiplication.");
-
+
Numeric res = numeric_add_opt_error(high_bits_res, low_bits_res, &error);
MKQL_ENSURE(error == false, "Bad numeric addition.");
@@ -335,7 +336,7 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
}
case INT4OID: {
return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); });
- }
+ }
case INT8OID: {
return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); });
}
@@ -387,9 +388,547 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
} else {
return {};
}
- }
+ }
}
return {};
}
+class IYsonBlockReaderForPg : public IYsonComplexTypeReader {
+public:
+ virtual NUdf::TBlockItem GetNotNull(TYsonBuffer&) = 0;
+ NUdf::TBlockItem GetNullableItem(TYsonBuffer& buf) {
+ char prev = buf.Current();
+ if (prev == NYson::NDetail::EntitySymbol) {
+ buf.Next();
+ return NUdf::TBlockItem();
+ }
+ if (prev == NYson::NDetail::BeginListSymbol) {
+ buf.Next();
+ YQL_ENSURE(buf.Current() == NYson::NDetail::EndListSymbol);
+ buf.Next();
+ return NUdf::TBlockItem();
+ }
+ return GetNotNull(buf);
+ }
+};
+
+
+NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std::vector<char>& tmp) {
+ if (desc.PassByValue) {
+ return NUdf::TBlockItem((ui64)datum);
+ }
+ auto typeLen = desc.TypeLen;
+ ui32 len;
+ if (typeLen == -1) {
+ len = GetFullVarSize((const text*)datum);
+ } else if (typeLen == -2) {
+ len = 1 + strlen((const char*)datum);
+ } else {
+ len = typeLen;
+ }
+ auto objlen = len;
+ len += sizeof(void*);
+ len = AlignUp<i32>(len, 8);
+ tmp.resize(len);
+ *(uint64_t*)tmp.data() = 0;
+ memcpy(tmp.data() + sizeof(void*), (const char*) datum, objlen);
+ return NUdf::TBlockItem(std::string_view(tmp.data(), len));
+}
+
+NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) {
+ NKikimr::NMiniKQL::TPAllocScope call;
+ StringInfoData stringInfo;
+ stringInfo.data = (char*)binary.Data();
+ stringInfo.len = binary.Size();
+ stringInfo.maxlen = binary.Size();
+ stringInfo.cursor = 0;
+
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ auto receiveFuncId = typeInfo.ReceiveFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
+ }
+
+ {
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(receiveFuncId);
+ fmgr_info(receiveFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)&stringInfo, false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ if (stringInfo.cursor != stringInfo.len) {
+ TStringBuilder errMsg;
+ errMsg << "Not all data has been consumed by 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", data size: " << stringInfo.len << ", consumed size: " << stringInfo.cursor;
+ UdfTerminate(errMsg.c_str());
+ }
+ return BlockItemFromDatum(x, typeInfo, tmp);
+ }
+}
+
+template<typename T>
+constexpr Datum FixedToDatum(T v) {
+ if constexpr (std::is_same_v<T, bool>) {
+ return BoolGetDatum(v);
+ } else if constexpr (std::is_same_v<T, i16>) {
+ return Int16GetDatum(v);
+ } else if constexpr (std::is_same_v<T, i32>) {
+ return Int32GetDatum(v);
+ } else if constexpr (std::is_same_v<T, i64>) {
+ return Int64GetDatum(v);
+ } else if constexpr (std::is_same_v<T, float>) {
+ return Float4GetDatum(v);
+ } else if constexpr (std::is_same_v<T, double>) {
+ return Float8GetDatum(v);
+ }
+}
+
+template<typename T>
+class TPgYsonFixedConverter final : public IYsonBlockReaderForPg {
+public:
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ Datum val;
+ if constexpr (std::is_same_v<T, bool>) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::FalseMarker || buf.Current() == NYson::NDetail::TrueMarker);
+ val = FixedToDatum<T>(buf.Current() == NYson::NDetail::TrueMarker);
+ buf.Next();
+ } else if constexpr (std::is_integral_v<T>) {
+ if constexpr (std::is_signed_v<T>) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::Int64Marker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.ReadVarI64());
+ } else {
+ Y_ENSURE(buf.Current() == NYson::NDetail::Uint64Marker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.ReadVarUI64());
+ }
+ } else {
+ Y_ENSURE(buf.Current() == NYson::NDetail::DoubleMarker);
+ buf.Next();
+ val = FixedToDatum<T>(buf.NextDouble());
+ }
+ return NUdf::TBlockItem(val);
+ }
+};
+
+template<bool IsCString, bool FixedLength>
+class TPgYsonStringConverter final : public IYsonBlockReaderForPg {
+public:
+
+ TPgYsonStringConverter(i32 typeLen) : TypeLen_(typeLen) {
+ if (typeLen == -2) {
+ YQL_ENSURE(IsCString && !FixedLength);
+ } else if (typeLen == -1) {
+ YQL_ENSURE(!IsCString && !FixedLength);
+ } else {
+ YQL_ENSURE(typeLen >= 0 && FixedLength);
+ }
+ }
+
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
+ buf.Next();
+ const i32 originalLen = buf.ReadVarI32();
+ auto res = buf.Data();
+ buf.Skip(originalLen);
+
+ ui32 len;
+ if constexpr (IsCString) {
+ len = 1 + originalLen + sizeof(void*);
+ } else if constexpr (FixedLength) {
+ len = TypeLen_ + sizeof(void*);
+ } else {
+ len = VARHDRSZ + originalLen + sizeof(void*);
+ }
+
+ if (Tmp_.capacity() < len) {
+ Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
+ }
+ len = AlignUp<ui32>(len, 8);
+ Tmp_.resize(len);
+ if constexpr (IsCString) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else if constexpr (FixedLength) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else {
+ memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
+ UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
+ }
+ return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
+ }
+private:
+ std::vector<char> Tmp_;
+ i32 TypeLen_;
+};
+
+
+class TPgYsonOtherConverter : public IYsonBlockReaderForPg {
+public:
+ TPgYsonOtherConverter(Oid typeId) : TypeId_(typeId) {}
+ NUdf::TBlockItem GetItem(TYsonBuffer& buf) override final {
+ return this->GetNullableItem(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonBuffer& buf) override final {
+ if (buf.Current() != NYson::NDetail::StringMarker) {
+ Y_ENSURE(buf.Current() == NYson::NDetail::StringMarker);
+ }
+ buf.Next();
+ const i32 len = buf.ReadVarI32();
+ auto ptr = buf.Data();
+ buf.Skip(len);
+ return PgBlockItemFromNativeBinary(TStringBuf(ptr, len), TypeId_, Tmp_);
+ }
+private:
+ Oid TypeId_;
+ std::vector<char> Tmp_;
+};
+
+
+template<typename T, arrow::Type::type Expected, typename ArrType>
+class TPgTopLevelFixedConverter : public IYtColumnConverter {
+public:
+ using Fn = Datum(*)(const T&);
+ TPgTopLevelFixedConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder) : Builder_(std::move(builder)) {}
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(Expected == valType->id());
+ return ConvertDict(data);
+ } else {
+ Y_ENSURE(Expected == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ ArrType arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(arr.Value(i))));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ auto values = data->dictionary->GetValues<T>(0);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ Builder_->Add(NUdf::TBlockItem(FixedToDatum<T>(values[indices[i]])));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+};
+
+template<bool IsCString, bool FixedLength>
+class TPgTopLevelStringConverter : public IYtColumnConverter {
+public:
+ TPgTopLevelStringConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, i32 typeLen) : Builder_(std::move(builder)), TypeLen_(typeLen) {
+ if (typeLen == -2) {
+ YQL_ENSURE(IsCString && !FixedLength);
+ } else if (typeLen == -1) {
+ YQL_ENSURE(!IsCString && !FixedLength);
+ } else {
+ YQL_ENSURE(typeLen >= 0 && FixedLength);
+ }
+ }
+
+ constexpr NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t originalLen) {
+ ui32 len;
+ if constexpr (IsCString) {
+ len = 1 + originalLen + sizeof(void*);
+ } else if constexpr (FixedLength) {
+ len = TypeLen_ + sizeof(void*);
+ } else {
+ len = VARHDRSZ + originalLen + sizeof(void*);
+ }
+
+ if (Tmp_.capacity() < len) {
+ Tmp_.reserve(Max<ui64>(len, Tmp_.capacity() * 2));
+ }
+ len = AlignUp<ui32>(len, 8);
+ Tmp_.resize(len);
+ if constexpr (IsCString) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else if constexpr (FixedLength) {
+ memcpy(Tmp_.data() + sizeof(void*), res, originalLen);
+ } else {
+ memcpy(Tmp_.data() + VARHDRSZ + sizeof(void*), res, originalLen);
+ UpdateCleanVarSize((text*)(Tmp_.data() + sizeof(void*)), originalLen);
+ }
+ return NUdf::TBlockItem(NUdf::TStringRef(Tmp_.data(), len));
+ }
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
+ return ConvertDict(data);
+ } else {
+ if (arrow::Type::STRING == data->type->id()) {
+ auto res = arrow::compute::Cast(data, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data = res->array();
+ }
+ Y_ENSURE(arrow::Type::BINARY == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::BinaryArray arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(ConvertOnce(res, len));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(ConvertOnce(res, len));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ if (arrow::Type::STRING == data->dictionary->type->id()) {
+ auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data->dictionary = res->array();
+ }
+ arrow::BinaryArray arr(data->dictionary);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+ std::vector<char> Tmp_;
+ i32 TypeLen_;
+};
+
+class TPgTopLevelOtherConverter : public IYtColumnConverter {
+public:
+ TPgTopLevelOtherConverter(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, Oid typeId) : Builder_(std::move(builder)), TypeId_(typeId) {}
+
+ inline NUdf::TBlockItem ConvertOnce(const uint8_t* res, size_t len) {
+ return PgBlockItemFromNativeBinary(TStringBuf(reinterpret_cast<const char*>(res), len), TypeId_, Tmp_);
+ }
+
+ arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> data) override final {
+ if (arrow::Type::DICTIONARY == data->type->id()) {
+ auto valType = static_cast<const arrow::DictionaryType&>(*data->type).value_type();
+ Y_ENSURE(arrow::Type::BINARY == valType->id() || arrow::Type::STRING == valType->id());
+ return ConvertDict(data);
+ } else {
+ Y_ENSURE(arrow::Type::BINARY == data->type->id() || arrow::Type::STRING == data->type->id());
+ return ConvertNonDict(data);
+ }
+ }
+
+ arrow::Datum ConvertNonDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::BinaryArray arr(data);
+ if (arr.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (arr.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(i, &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+
+ arrow::Datum ConvertDict(std::shared_ptr<arrow::ArrayData> data) {
+ arrow::DictionaryArray dict(data);
+ if (arrow::Type::STRING == data->dictionary->type->id()) {
+ auto res = arrow::compute::Cast(data->dictionary, std::make_shared<arrow::BinaryType>());
+ Y_ENSURE(res.ok());
+ data->dictionary = res->array();
+ }
+ arrow::BinaryArray arr(data->dictionary);
+ auto indices = dict.indices()->data()->GetValues<ui32>(1);
+ if (dict.null_count()) {
+ for (i64 i = 0; i < data->length; ++i) {
+ if (dict.IsNull(i)) {
+ Builder_->Add(NUdf::TBlockItem{});
+ } else {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ } else {
+ for (i64 i = 0; i < data->length; ++i) {
+ i32 len;
+ auto res = arr.GetValue(indices[i], &len);
+ Builder_->Add(NUdf::TBlockItem(ConvertOnce(res, len)));
+ }
+ }
+ return Builder_->Build(false);
+ }
+private:
+ std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder_;
+ Oid TypeId_;
+ std::vector<char> Tmp_;
+};
+
+std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType) {
+ YQL_ENSURE(targetType);
+
+ switch (targetType->GetTypeId()) {
+ case BOOLOID: {
+ return std::make_unique<TPgTopLevelFixedConverter<bool, arrow::Type::BOOL, arrow::BooleanArray>>(std::move(builder));
+ }
+ case INT2OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i16, arrow::Type::INT16, arrow::Int16Array>>(std::move(builder));
+ }
+ case INT4OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i32, arrow::Type::INT32, arrow::Int32Array>>(std::move(builder));
+ }
+ case INT8OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<i64, arrow::Type::INT64, arrow::Int64Array>>(std::move(builder));
+ }
+ case FLOAT4OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<float, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
+ }
+ case FLOAT8OID: {
+ return std::make_unique<TPgTopLevelFixedConverter<double, arrow::Type::DOUBLE, arrow::DoubleArray>>(std::move(builder));
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID:
+ case NAMEOID:
+ case CSTRINGOID: {
+ auto typeLen = NPg::LookupType(targetType->GetTypeId()).TypeLen;
+ if (typeLen == -2) {
+ return std::make_unique<TPgTopLevelStringConverter<true, false>>(std::move(builder), typeLen);
+ } else if (typeLen == -1) {
+ return std::make_unique<TPgTopLevelStringConverter<false, false>>(std::move(builder), typeLen);
+ } else {
+ return std::make_unique<TPgTopLevelStringConverter<false, true>>(std::move(builder), typeLen);
+ }
+ }
+ default:
+ return std::make_unique<TPgTopLevelOtherConverter>(std::move(builder), targetType->GetTypeId());
+ }
+}
+
+
+std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) {
+ switch (desc.TypeId) {
+ case BOOLOID: {
+ return std::make_unique<TPgYsonFixedConverter<bool>>();
+ }
+ case INT2OID: {
+ return std::make_unique<TPgYsonFixedConverter<i16>>();
+ }
+ case INT4OID: {
+ return std::make_unique<TPgYsonFixedConverter<i32>>();
+ }
+ case INT8OID: {
+ return std::make_unique<TPgYsonFixedConverter<i64>>();
+ }
+ case FLOAT4OID: {
+ return std::make_unique<TPgYsonFixedConverter<float>>();
+ }
+ case FLOAT8OID: {
+ return std::make_unique<TPgYsonFixedConverter<double>>();
+ }
+ case BYTEAOID:
+ case NAMEOID:
+ case VARCHAROID:
+ case TEXTOID:
+ case CSTRINGOID: {
+ auto typeLen = NPg::LookupType(desc.TypeId).TypeLen;
+ if (typeLen == -2) {
+ return std::make_unique<TPgYsonStringConverter<true, false>>(typeLen);
+ } else if (typeLen == -1) {
+ return std::make_unique<TPgYsonStringConverter<false, false>>(typeLen);
+ } else {
+ return std::make_unique<TPgYsonStringConverter<false, true>>(typeLen);
+ }
+ }
+ default:
+ return std::make_unique<TPgYsonOtherConverter>(desc.TypeId);
+ }
+}
+
}
diff --git a/yql/essentials/parser/pg_wrapper/interface/arrow.h b/yql/essentials/parser/pg_wrapper/interface/arrow.h
index f8e9c2a0622..f9aead6a553 100644
--- a/yql/essentials/parser/pg_wrapper/interface/arrow.h
+++ b/yql/essentials/parser/pg_wrapper/interface/arrow.h
@@ -1,8 +1,14 @@
#pragma once
+#include <yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter.h>
+#include <yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter_details.h>
+
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/public/udf/arrow/block_item.h>
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+
#include <arrow/datum.h>
+
namespace NYql {
arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool);
@@ -11,6 +17,8 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NUdf::TBlockIt
using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>;
TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType);
+std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc);
+std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType);
} // NYql
namespace NKikimr {
@@ -20,4 +28,4 @@ class IBlockAggregatorFactory;
void RegisterPgBlockAggs(THashMap<TString, std::unique_ptr<IBlockAggregatorFactory>>& registry);
}
-} \ No newline at end of file
+}
diff --git a/yql/essentials/parser/pg_wrapper/interface/ya.make b/yql/essentials/parser/pg_wrapper/interface/ya.make
index 92fe7ff1669..90f37bffcdf 100644
--- a/yql/essentials/parser/pg_wrapper/interface/ya.make
+++ b/yql/essentials/parser/pg_wrapper/interface/ya.make
@@ -20,6 +20,7 @@ PEERDIR(
yql/essentials/public/udf/arrow
yql/essentials/core/cbo
library/cpp/disjoint_sets
+ yql/essentials/providers/common/codec/yt_arrow_converter_interface
)
YQL_LAST_ABI_VERSION()
diff --git a/yql/essentials/providers/common/codec/ya.make b/yql/essentials/providers/common/codec/ya.make
index 13ccdc726b1..ca9d05dd2a2 100644
--- a/yql/essentials/providers/common/codec/ya.make
+++ b/yql/essentials/providers/common/codec/ya.make
@@ -30,6 +30,7 @@ END()
RECURSE(
arrow
+ yt_arrow_converter_interface
)
RECURSE_FOR_TESTS(
diff --git a/yql/essentials/providers/common/codec/yt_arrow_converter_interface/ya.make b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/ya.make
new file mode 100644
index 00000000000..de05d309851
--- /dev/null
+++ b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/ya.make
@@ -0,0 +1,10 @@
+LIBRARY()
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ yql/essentials/public/udf
+ yql/essentials/utils
+ yql/essentials/public/udf/arrow
+)
+
+END()
diff --git a/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter.h b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter.h
new file mode 100644
index 00000000000..532b9cfa8d2
--- /dev/null
+++ b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <arrow/array.h>
+
+namespace NYql {
+
+class IYtColumnConverter {
+public:
+ virtual arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) = 0;
+ virtual ~IYtColumnConverter() = default;
+};
+
+}
diff --git a/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter_details.h b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter_details.h
new file mode 100644
index 00000000000..5aabd096825
--- /dev/null
+++ b/yql/essentials/providers/common/codec/yt_arrow_converter_interface/yt_arrow_converter_details.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include <yql/essentials/public/udf/arrow/block_reader.h>
+
+#include <yql/essentials/utils/yql_panic.h>
+
+#include <library/cpp/yson/detail.h>
+#include <library/cpp/yson/varint.h>
+
+namespace NYql {
+
+using namespace NYson::NDetail;
+
+class TYsonBuffer {
+public:
+ TYsonBuffer(const std::string_view& s) : Data_(s.data()), Available_(s.size()) {}
+
+ constexpr char Next() {
+ YQL_ENSURE(Available_-- > 0);
+ return *(++Data_);
+ }
+
+ constexpr char Current() {
+ return *Data_;
+ }
+
+ ui32 ReadVarUI32() {
+ char prev = Current();
+ if (Y_LIKELY(!(prev & 0x80))) {
+ Next();
+ return prev;
+ }
+
+ return ReadVarSlow<ui32>();
+ }
+
+ ui64 ReadVarUI64() {
+ char prev = Current();
+ if (Y_LIKELY(!(prev & 0x80))) {
+ Next();
+ return prev;
+ }
+
+ return ReadVarSlow<ui64>();
+ }
+
+ i32 ReadVarI32() {
+ return NYson::ZigZagDecode32(ReadVarUI32());
+ }
+
+ i64 ReadVarI64() {
+ return NYson::ZigZagDecode64(ReadVarUI64());
+ }
+
+ double NextDouble() {
+ double val = *reinterpret_cast<const double*>(Data_);
+ Data_ += sizeof(double);
+ return val;
+ }
+
+ void Skip(i32 cnt) {
+ Data_ += cnt;
+ }
+
+ const char* Data() {
+ return Data_;
+ }
+
+ size_t Available() const {
+ return Available_;
+ }
+private:
+ template<typename T>
+ constexpr T ReadVarSlow() {
+ T shift = 0;
+ T value = Current() & 0x7f;
+ for (;;) {
+ shift += 7;
+ value |= T(Next() & 0x7f) << shift;
+ if (!(Current() & 0x80)) {
+ break;
+ }
+ }
+ Next();
+ return value;
+ }
+
+ const char* Data_;
+ size_t Available_;
+};
+
+
+class IYsonComplexTypeReader {
+public:
+ using TPtr = std::unique_ptr<IYsonComplexTypeReader>;
+ virtual ~IYsonComplexTypeReader() = default;
+ virtual NUdf::TBlockItem GetItem(TYsonBuffer& buf) = 0;
+};
+
+}
diff --git a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
index b7f5fd713ae..9a9a10bab63 100644
--- a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
@@ -551,4 +551,14 @@ ui64 HexEncode(const char *src, size_t len, char *dst) {
throw yexception() << "HexEncode in pg_dummy does nothing";
}
+
+std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& builder, const NKikimr::NMiniKQL::TPgType* targetType) {
+ throw yexception() << "PG types are not supported";
+}
+
+
+std::unique_ptr<IYsonComplexTypeReader> BuildPgYsonColumnReader(const NUdf::TPgTypeDescription& desc) {
+ throw yexception() << "PG types are not supported";
+}
+
} // NYql