diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-24 21:08:53 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-24 21:08:53 +0300 |
commit | a5cd1a0c117aeb638bcd9f70bbf20acaa0151927 (patch) | |
tree | b3cdd128749c726f3a0c870c39fefcd2f6b3e9df | |
parent | 6985cc4bc61ea68b9fcf5783486fd1d6b897a305 (diff) | |
download | ydb-a5cd1a0c117aeb638bcd9f70bbf20acaa0151927.tar.gz |
YQL-13710 Presort encoding
ref:f12714f590ca01faa23f5f475e56d45452dd6843
-rw-r--r-- | ydb/library/yql/minikql/computation/presort.cpp | 278 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/presort_impl.h | 265 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 101 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 15 |
4 files changed, 405 insertions, 254 deletions
diff --git a/ydb/library/yql/minikql/computation/presort.cpp b/ydb/library/yql/minikql/computation/presort.cpp index 26b96a60aa..eac451bfeb 100644 --- a/ydb/library/yql/minikql/computation/presort.cpp +++ b/ydb/library/yql/minikql/computation/presort.cpp @@ -1,4 +1,5 @@ #include "presort.h" +#include "presort_impl.h" #include "mkql_computation_node_holders.h" #include <ydb/library/yql/minikql/defs.h> #include <ydb/library/yql/minikql/mkql_string_util.h> @@ -14,255 +15,11 @@ namespace NMiniKQL { namespace NDetail { -using NYql::SwapBytes; - -Y_FORCE_INLINE -void EnsureInputSize(TStringBuf& input, size_t size) { - MKQL_ENSURE(input.size() >= size, "premature end of input"); -} - -template <bool Desc> -Y_FORCE_INLINE -void EncodeBool(TVector<ui8>& output, bool value) { - output.push_back(Desc ? 0xFF ^ ui8(value) : ui8(value)); -} - -template <bool Desc> -Y_FORCE_INLINE -bool DecodeBool(TStringBuf& input) { - EnsureInputSize(input, 1); - auto result = Desc ? bool(0xFF ^ ui8(input[0])) : bool(input[0]); - input.Skip(1); - return result; -} - -template <typename TUnsigned, bool Desc> -Y_FORCE_INLINE -void EncodeUnsigned(TVector<ui8>& output, TUnsigned value) { - constexpr size_t size = sizeof(TUnsigned); - - if (Desc) { - value = ~value; - } - - output.resize(output.size() + size); - WriteUnaligned<TUnsigned>(output.end() - size, SwapBytes(value)); -} - -template <typename TUnsigned, bool Desc> -Y_FORCE_INLINE -TUnsigned DecodeUnsigned(TStringBuf& input) { - constexpr size_t size = sizeof(TUnsigned); - - EnsureInputSize(input, size); - auto value = ReadUnaligned<TUnsigned>(input.data()); - input.Skip(size); - - value = SwapBytes(value); - if (Desc) { - value = ~value; - } - return value; -} - -template <typename TSigned, bool Desc> -Y_FORCE_INLINE -void EncodeSigned(TVector<ui8>& output, TSigned value) { - using TUnsigned = std::make_unsigned_t<TSigned>; - constexpr size_t size = sizeof(TUnsigned); - constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1); - - EncodeUnsigned<TUnsigned, Desc>(output, TUnsigned(value) + shift); -} - -template <typename TSigned, bool Desc> -Y_FORCE_INLINE -TSigned DecodeSigned(TStringBuf& input) { - using TUnsigned = std::make_unsigned_t<TSigned>; - constexpr size_t size = sizeof(TUnsigned); - constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1); - - return TSigned(DecodeUnsigned<TUnsigned, Desc>(input) - shift); -} - -enum class EFPCode : ui8 { - NegInf = 0, - Neg = 1, - Zero = 2, - Pos = 3, - PosInf = 4, - Nan = 5 -}; - -template <typename TFloat> -struct TFloatToInteger {}; - -template <> -struct TFloatToInteger<float> { - using TType = ui32; -}; - -template <> -struct TFloatToInteger<double> { - using TType = ui64; -}; - -static_assert(std::numeric_limits<float>::is_iec559, "float type is not iec559(ieee754)"); -static_assert(std::numeric_limits<double>::is_iec559, "double type is not iec559(ieee754)"); - -template <typename TFloat, bool Desc> -Y_FORCE_INLINE -void EncodeFloating(TVector<ui8>& output, TFloat value) { - using TInteger = typename TFloatToInteger<TFloat>::TType; - EFPCode code; - - switch (std::fpclassify(value)) { - case FP_NORMAL: - case FP_SUBNORMAL: { - auto integer = ReadUnaligned<TInteger>(&value); - if (value < 0) { - integer = ~integer; - code = EFPCode::Neg; - } else { - code = EFPCode::Pos; - } - output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code)); - EncodeUnsigned<TInteger, Desc>(output, integer); - return; - } - case FP_ZERO: - code = EFPCode::Zero; - break; - case FP_INFINITE: - code = value < 0 ? EFPCode::NegInf : EFPCode::PosInf; - break; - default: - code = EFPCode::Nan; - break; - } - output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code)); -} - -template <typename TFloat, bool Desc> -Y_FORCE_INLINE -TFloat DecodeFloating(TStringBuf& input) { - using TInteger = typename TFloatToInteger<TFloat>::TType; - - EnsureInputSize(input, 1); - auto code = EFPCode(Desc ? 0xFF ^ input[0] : input[0]); - input.Skip(1); - - bool negative; - switch (code) { - case EFPCode::Zero: - return 0; - case EFPCode::NegInf: - return -std::numeric_limits<TFloat>::infinity(); - case EFPCode::PosInf: - return std::numeric_limits<TFloat>::infinity(); - case EFPCode::Nan: - return std::numeric_limits<TFloat>::quiet_NaN(); - case EFPCode::Neg: - negative = true; - break; - case EFPCode::Pos: - negative = false; - break; - default: - MKQL_ENSURE(false, "floating point data is corrupted"); - } - - auto integer = DecodeUnsigned<TInteger, Desc>(input); - if (negative) { - integer = ~integer; - } - - return ReadUnaligned<TFloat>(&integer); -} - -constexpr ui8 BlockCode = 0x1F; -constexpr size_t BlockSize = 15; -constexpr size_t BlockSizeUi64 = BlockSize / 8 + 1; - -template <bool Desc> -Y_FORCE_INLINE -void EncodeString(TVector<ui8>& output, TStringBuf value) { - size_t part = 0; - - while (!value.empty()) { - union { - ui8 buffer[BlockSize + 1]; - ui64 buffer64[BlockSizeUi64]; - }; - - part = std::min(value.size(), BlockSize); - if (part == BlockSize) { - std::memcpy(buffer + 1, value.data(), BlockSize); - } else { - for (size_t i = 0; i < BlockSizeUi64; ++i) { - buffer64[i] = 0; - } - std::memcpy(buffer + 1, value.data(), part); - } - value.Skip(part); - - buffer[0] = BlockCode; - - if (Desc) { - for (size_t i = 0; i < BlockSizeUi64; ++i) { - buffer64[i] ^= std::numeric_limits<ui64>::max(); - } - } - - output.insert(output.end(), buffer, buffer + BlockSize + 1); - } - - auto lastLength = ui8(part); - output.push_back(Desc ? 0xFF ^ lastLength : lastLength); -} - -template <bool Desc> -Y_FORCE_INLINE -TStringBuf DecodeString(TStringBuf& input, TVector<ui8>& value) { - EnsureInputSize(input, 1); - ui8 code = Desc ? 0xFF ^ input[0] : input[0]; - input.Skip(1); - - if (code != BlockCode) { - MKQL_ENSURE(code == 0, TStringBuilder() << "unknown string block code: " << code); - return TStringBuf(); - } - - while (code == BlockCode) { - union { - ui8 buffer[BlockSize + 1]; - ui64 buffer64[BlockSizeUi64]; - }; - - EnsureInputSize(input, BlockSize + 1); - std::memcpy(buffer, input.data(), BlockSize + 1); - input.Skip(BlockSize + 1); - - if (Desc) { - for (size_t i = 0; i < BlockSizeUi64; ++i) { - buffer64[i] ^= std::numeric_limits<ui64>::max(); - } - } - - value.insert(value.end(), buffer, buffer + BlockSize); - code = buffer[BlockSize]; - } - - auto begin = (const char*)value.begin(); - auto end = (const char*)value.end() - BlockSize + code; - return TStringBuf(begin, end - begin); -} - constexpr size_t UuidSize = 16; template <bool Desc> Y_FORCE_INLINE -void EncodeUuid(TVector<ui8>& output, const char* data) { + void EncodeUuid(TVector<ui8>& output, const char* data) { output.resize(output.size() + UuidSize); auto ptr = output.end() - UuidSize; @@ -270,14 +27,15 @@ void EncodeUuid(TVector<ui8>& output, const char* data) { for (size_t i = 0; i < UuidSize; ++i) { *ptr++ = ui8(*data++) ^ 0xFF; } - } else { + } + else { std::memcpy(ptr, data, UuidSize); } } template <bool Desc> Y_FORCE_INLINE -TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) { + TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) { EnsureInputSize(input, UuidSize); auto data = input.data(); input.Skip(UuidSize); @@ -289,7 +47,8 @@ TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) { for (size_t i = 0; i < UuidSize; ++i) { *ptr++ = ui8(*data++) ^ 0xFF; } - } else { + } + else { std::memcpy(ptr, data, UuidSize); } @@ -298,7 +57,7 @@ TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) { template <typename TUnsigned, bool Desc> Y_FORCE_INLINE -void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) { + void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) { constexpr size_t size = sizeof(TUnsigned); if (Desc) { @@ -313,7 +72,7 @@ void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) { template <typename TUnsigned, bool Desc> Y_FORCE_INLINE -void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) { + void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) { constexpr size_t size = sizeof(TUnsigned); EnsureInputSize(input, size + sizeof(ui16)); @@ -324,7 +83,8 @@ void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) { if (Desc) { value = ~SwapBytes(v); tzId = ~SwapBytes(t); - } else { + } + else { value = SwapBytes(v); tzId = SwapBytes(t); } @@ -334,7 +94,7 @@ constexpr size_t DecimalSize = sizeof(NYql::NDecimal::TInt128); template <bool Desc> Y_FORCE_INLINE -void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) { + void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) { output.resize(output.size() + DecimalSize); auto ptr = reinterpret_cast<char*>(output.end() - DecimalSize); output.resize(output.size() + NYql::NDecimal::Serialize(Desc ? -value : value, ptr) - DecimalSize); @@ -342,14 +102,13 @@ void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) { template <bool Desc> Y_FORCE_INLINE -NYql::NDecimal::TInt128 DecodeDecimal(TStringBuf& input) { + NYql::NDecimal::TInt128 DecodeDecimal(TStringBuf& input) { MKQL_ENSURE(input.size() > 0U && input.size() <= DecimalSize, "premature end of input"); const auto des = NYql::NDecimal::Deserialize(input.data()); input.Skip(des.second); return Desc ? -des.first : des.first; } - template <bool Desc> Y_FORCE_INLINE void Encode(TVector<ui8>& output, NUdf::EDataSlot slot, const NUdf::TUnboxedValuePod& value) { @@ -635,6 +394,12 @@ void EncodeValue(TType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& ou break; } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + EncodePresortPGValue(pgType, value, output); + break; + } + default: MKQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); } @@ -655,6 +420,11 @@ NUdf::TUnboxedValue DecodeImpl(TType* type, TStringBuf& input, const THolderFact auto slot = *static_cast<TDataType*>(type)->GetDataSlot(); return Decode<false>(input, slot, buffer); } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + return DecodePresortPGValue(pgType, input, buffer); + } + case TType::EKind::Optional: { auto itemType = static_cast<TOptionalType*>(type)->GetItemType(); auto hasValue = DecodeBool<false>(input); diff --git a/ydb/library/yql/minikql/computation/presort_impl.h b/ydb/library/yql/minikql/computation/presort_impl.h new file mode 100644 index 0000000000..5729cf3d24 --- /dev/null +++ b/ydb/library/yql/minikql/computation/presort_impl.h @@ -0,0 +1,265 @@ +#pragma once +#include <ydb/library/yql/minikql/defs.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/utils/swap_bytes.h> + +#include <util/generic/vector.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace NDetail { + + using NYql::SwapBytes; + + Y_FORCE_INLINE + void EnsureInputSize(TStringBuf& input, size_t size) { + MKQL_ENSURE(input.size() >= size, "premature end of input"); + } + + template <bool Desc> + Y_FORCE_INLINE + void EncodeBool(TVector<ui8>& output, bool value) { + output.push_back(Desc ? 0xFF ^ ui8(value) : ui8(value)); + } + + template <bool Desc> + Y_FORCE_INLINE + bool DecodeBool(TStringBuf& input) { + EnsureInputSize(input, 1); + auto result = Desc ? bool(0xFF ^ ui8(input[0])) : bool(input[0]); + input.Skip(1); + return result; + } + + template <typename TUnsigned, bool Desc> + Y_FORCE_INLINE + void EncodeUnsigned(TVector<ui8>& output, TUnsigned value) { + constexpr size_t size = sizeof(TUnsigned); + + if (Desc) { + value = ~value; + } + + output.resize(output.size() + size); + WriteUnaligned<TUnsigned>(output.end() - size, SwapBytes(value)); + } + + template <typename TUnsigned, bool Desc> + Y_FORCE_INLINE + TUnsigned DecodeUnsigned(TStringBuf& input) { + constexpr size_t size = sizeof(TUnsigned); + + EnsureInputSize(input, size); + auto value = ReadUnaligned<TUnsigned>(input.data()); + input.Skip(size); + + value = SwapBytes(value); + if (Desc) { + value = ~value; + } + return value; + } + + template <typename TSigned, bool Desc> + Y_FORCE_INLINE + void EncodeSigned(TVector<ui8>& output, TSigned value) { + using TUnsigned = std::make_unsigned_t<TSigned>; + constexpr size_t size = sizeof(TUnsigned); + constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1); + + EncodeUnsigned<TUnsigned, Desc>(output, TUnsigned(value) + shift); + } + + template <typename TSigned, bool Desc> + Y_FORCE_INLINE + TSigned DecodeSigned(TStringBuf& input) { + using TUnsigned = std::make_unsigned_t<TSigned>; + constexpr size_t size = sizeof(TUnsigned); + constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1); + + return TSigned(DecodeUnsigned<TUnsigned, Desc>(input) - shift); + } + + enum class EFPCode : ui8 { + NegInf = 0, + Neg = 1, + Zero = 2, + Pos = 3, + PosInf = 4, + Nan = 5 + }; + + template <typename TFloat> + struct TFloatToInteger {}; + + template <> + struct TFloatToInteger<float> { + using TType = ui32; + }; + + template <> + struct TFloatToInteger<double> { + using TType = ui64; + }; + + static_assert(std::numeric_limits<float>::is_iec559, "float type is not iec559(ieee754)"); + static_assert(std::numeric_limits<double>::is_iec559, "double type is not iec559(ieee754)"); + + template <typename TFloat, bool Desc> + Y_FORCE_INLINE + void EncodeFloating(TVector<ui8>& output, TFloat value) { + using TInteger = typename TFloatToInteger<TFloat>::TType; + EFPCode code; + + switch (std::fpclassify(value)) { + case FP_NORMAL: + case FP_SUBNORMAL: { + auto integer = ReadUnaligned<TInteger>(&value); + if (value < 0) { + integer = ~integer; + code = EFPCode::Neg; + } + else { + code = EFPCode::Pos; + } + output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code)); + EncodeUnsigned<TInteger, Desc>(output, integer); + return; + } + case FP_ZERO: + code = EFPCode::Zero; + break; + case FP_INFINITE: + code = value < 0 ? EFPCode::NegInf : EFPCode::PosInf; + break; + default: + code = EFPCode::Nan; + break; + } + output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code)); + } + + template <typename TFloat, bool Desc> + Y_FORCE_INLINE + TFloat DecodeFloating(TStringBuf& input) { + using TInteger = typename TFloatToInteger<TFloat>::TType; + + EnsureInputSize(input, 1); + auto code = EFPCode(Desc ? 0xFF ^ input[0] : input[0]); + input.Skip(1); + + bool negative; + switch (code) { + case EFPCode::Zero: + return 0; + case EFPCode::NegInf: + return -std::numeric_limits<TFloat>::infinity(); + case EFPCode::PosInf: + return std::numeric_limits<TFloat>::infinity(); + case EFPCode::Nan: + return std::numeric_limits<TFloat>::quiet_NaN(); + case EFPCode::Neg: + negative = true; + break; + case EFPCode::Pos: + negative = false; + break; + default: + MKQL_ENSURE(false, "floating point data is corrupted"); + } + + auto integer = DecodeUnsigned<TInteger, Desc>(input); + if (negative) { + integer = ~integer; + } + + return ReadUnaligned<TFloat>(&integer); + } + + constexpr ui8 BlockCode = 0x1F; + constexpr size_t BlockSize = 15; + constexpr size_t BlockSizeUi64 = BlockSize / 8 + 1; + + template <bool Desc> + Y_FORCE_INLINE + void EncodeString(TVector<ui8>& output, TStringBuf value) { + size_t part = 0; + + while (!value.empty()) { + union { + ui8 buffer[BlockSize + 1]; + ui64 buffer64[BlockSizeUi64]; + }; + + part = std::min(value.size(), BlockSize); + if (part == BlockSize) { + std::memcpy(buffer + 1, value.data(), BlockSize); + } + else { + for (size_t i = 0; i < BlockSizeUi64; ++i) { + buffer64[i] = 0; + } + std::memcpy(buffer + 1, value.data(), part); + } + value.Skip(part); + + buffer[0] = BlockCode; + + if (Desc) { + for (size_t i = 0; i < BlockSizeUi64; ++i) { + buffer64[i] ^= std::numeric_limits<ui64>::max(); + } + } + + output.insert(output.end(), buffer, buffer + BlockSize + 1); + } + + auto lastLength = ui8(part); + output.push_back(Desc ? 0xFF ^ lastLength : lastLength); + } + + template <bool Desc> + Y_FORCE_INLINE + TStringBuf DecodeString(TStringBuf& input, TVector<ui8>& value) { + EnsureInputSize(input, 1); + ui8 code = Desc ? 0xFF ^ input[0] : input[0]; + input.Skip(1); + + if (code != BlockCode) { + MKQL_ENSURE(code == 0, TStringBuilder() << "unknown string block code: " << code); + return TStringBuf(); + } + + while (code == BlockCode) { + union { + ui8 buffer[BlockSize + 1]; + ui64 buffer64[BlockSizeUi64]; + }; + + EnsureInputSize(input, BlockSize + 1); + std::memcpy(buffer, input.data(), BlockSize + 1); + input.Skip(BlockSize + 1); + + if (Desc) { + for (size_t i = 0; i < BlockSizeUi64; ++i) { + buffer64[i] ^= std::numeric_limits<ui64>::max(); + } + } + + value.insert(value.end(), buffer, buffer + BlockSize); + code = buffer[BlockSize]; + } + + auto begin = (const char*)value.begin(); + auto end = (const char*)value.end() - BlockSize + code; + return TStringBuf(begin, end - begin); + } +} + +void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output); +NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer); + +} // NMiniKQL +} // NKikimr diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index a4870bf022..6390675026 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -2,6 +2,7 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h> +#include <ydb/library/yql/minikql/computation/presort_impl.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_alloc.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> @@ -1621,6 +1622,106 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { } } +void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output) { + switch (type->GetTypeId()) { + case BOOLOID: { + const auto x = DatumGetBool(ScalarDatumFromPod(value)) != 0; + NDetail::EncodeBool<false>(output, x); + break; + } + case INT2OID: { + const auto x = DatumGetInt16(ScalarDatumFromPod(value)); + NDetail::EncodeSigned<i16, false>(output, x); + break; + } + case INT4OID: { + const auto x = DatumGetInt32(ScalarDatumFromPod(value)); + NDetail::EncodeSigned<i32, false>(output, x); + break; + } + case INT8OID: { + const auto x = DatumGetInt64(ScalarDatumFromPod(value)); + NDetail::EncodeSigned<i64, false>(output, x); + break; + } + case FLOAT4OID: { + const auto x = DatumGetFloat4(ScalarDatumFromPod(value)); + NDetail::EncodeFloating<float, false>(output, x); + break; + } + case FLOAT8OID: { + const auto x = DatumGetFloat8(ScalarDatumFromPod(value)); + NDetail::EncodeFloating<double, false>(output, x); + break; + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + SET_MEMORY_CONTEXT; + TPAllocScope call; + const auto x = (const text*)PointerDatumFromPod(value, true); + ui32 len = VARSIZE_ANY_EXHDR(x); + TString ret; + if (len) { + ret = TString::Uninitialized(len); + text_to_cstring_buffer(x, ret.begin(), len + 1); + } + + NDetail::EncodeString<false>(output, ret); + break; + } + case CSTRINGOID: { + SET_MEMORY_CONTEXT; + TPAllocScope call; + const auto x = (const char*)PointerDatumFromPod(value, false); + NDetail::EncodeString<false>(output, x); + break; + } + default: + SET_MEMORY_CONTEXT; + TPAllocScope call; + const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + FmgrInfo finfo; + Zero(finfo); + Y_ENSURE(typeInfo.SendFuncId); + fmgr_info(typeInfo.SendFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs == 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value, typeInfo.TypeLen == -1), false }; + auto x = (text*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_DEFER { + pfree(x); + }; + + ui32 len = VARSIZE_ANY_EXHDR(x); + TString ret; + if (len) { + ret = TString::Uninitialized(len); + text_to_cstring_buffer(x, ret.begin(), len + 1); + } + + NDetail::EncodeString<false>(output, ret); + } +} + +NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer) { + // presort decoding is used only inside RTMR, don't implement it right now + Y_UNUSED(type); + Y_UNUSED(input); + Y_UNUSED(buffer); + throw yexception() << "PG types are not supported"; +} + void* PgInitializeContext(const std::string_view& contextType) { if (contextType == "Agg") { auto ctx = (AggState*)MKQLAllocWithSize(sizeof(AggState)); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index ae594de09d..a8887ee935 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -1,6 +1,7 @@ #include <ydb/library/yql/sql/pg_sql.h> #include <ydb/library/yql/providers/common/codec/yql_pg_codec.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h> +#include <ydb/library/yql/minikql/computation/presort_impl.h> #include <ydb/library/yql/core/yql_pg_utils.h> namespace NSQLTranslationPG { @@ -87,6 +88,20 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { throw yexception() << "PG types are not supported"; } +void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output) { + Y_UNUSED(type); + Y_UNUSED(value); + Y_UNUSED(output); + throw yexception() << "PG types are not supported"; +} + +NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer) { + Y_UNUSED(type); + Y_UNUSED(input); + Y_UNUSED(buffer); + throw yexception() << "PG types are not supported"; +} + void* PgInitializeContext(const std::string_view& contextType) { Y_UNUSED(contextType); return nullptr; |