aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-24 21:08:53 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-24 21:08:53 +0300
commita5cd1a0c117aeb638bcd9f70bbf20acaa0151927 (patch)
treeb3cdd128749c726f3a0c870c39fefcd2f6b3e9df
parent6985cc4bc61ea68b9fcf5783486fd1d6b897a305 (diff)
downloadydb-a5cd1a0c117aeb638bcd9f70bbf20acaa0151927.tar.gz
YQL-13710 Presort encoding
ref:f12714f590ca01faa23f5f475e56d45452dd6843
-rw-r--r--ydb/library/yql/minikql/computation/presort.cpp278
-rw-r--r--ydb/library/yql/minikql/computation/presort_impl.h265
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp101
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp15
4 files changed, 405 insertions, 254 deletions
diff --git a/ydb/library/yql/minikql/computation/presort.cpp b/ydb/library/yql/minikql/computation/presort.cpp
index 26b96a60aa..eac451bfeb 100644
--- a/ydb/library/yql/minikql/computation/presort.cpp
+++ b/ydb/library/yql/minikql/computation/presort.cpp
@@ -1,4 +1,5 @@
#include "presort.h"
+#include "presort_impl.h"
#include "mkql_computation_node_holders.h"
#include <ydb/library/yql/minikql/defs.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
@@ -14,255 +15,11 @@ namespace NMiniKQL {
namespace NDetail {
-using NYql::SwapBytes;
-
-Y_FORCE_INLINE
-void EnsureInputSize(TStringBuf& input, size_t size) {
- MKQL_ENSURE(input.size() >= size, "premature end of input");
-}
-
-template <bool Desc>
-Y_FORCE_INLINE
-void EncodeBool(TVector<ui8>& output, bool value) {
- output.push_back(Desc ? 0xFF ^ ui8(value) : ui8(value));
-}
-
-template <bool Desc>
-Y_FORCE_INLINE
-bool DecodeBool(TStringBuf& input) {
- EnsureInputSize(input, 1);
- auto result = Desc ? bool(0xFF ^ ui8(input[0])) : bool(input[0]);
- input.Skip(1);
- return result;
-}
-
-template <typename TUnsigned, bool Desc>
-Y_FORCE_INLINE
-void EncodeUnsigned(TVector<ui8>& output, TUnsigned value) {
- constexpr size_t size = sizeof(TUnsigned);
-
- if (Desc) {
- value = ~value;
- }
-
- output.resize(output.size() + size);
- WriteUnaligned<TUnsigned>(output.end() - size, SwapBytes(value));
-}
-
-template <typename TUnsigned, bool Desc>
-Y_FORCE_INLINE
-TUnsigned DecodeUnsigned(TStringBuf& input) {
- constexpr size_t size = sizeof(TUnsigned);
-
- EnsureInputSize(input, size);
- auto value = ReadUnaligned<TUnsigned>(input.data());
- input.Skip(size);
-
- value = SwapBytes(value);
- if (Desc) {
- value = ~value;
- }
- return value;
-}
-
-template <typename TSigned, bool Desc>
-Y_FORCE_INLINE
-void EncodeSigned(TVector<ui8>& output, TSigned value) {
- using TUnsigned = std::make_unsigned_t<TSigned>;
- constexpr size_t size = sizeof(TUnsigned);
- constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1);
-
- EncodeUnsigned<TUnsigned, Desc>(output, TUnsigned(value) + shift);
-}
-
-template <typename TSigned, bool Desc>
-Y_FORCE_INLINE
-TSigned DecodeSigned(TStringBuf& input) {
- using TUnsigned = std::make_unsigned_t<TSigned>;
- constexpr size_t size = sizeof(TUnsigned);
- constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1);
-
- return TSigned(DecodeUnsigned<TUnsigned, Desc>(input) - shift);
-}
-
-enum class EFPCode : ui8 {
- NegInf = 0,
- Neg = 1,
- Zero = 2,
- Pos = 3,
- PosInf = 4,
- Nan = 5
-};
-
-template <typename TFloat>
-struct TFloatToInteger {};
-
-template <>
-struct TFloatToInteger<float> {
- using TType = ui32;
-};
-
-template <>
-struct TFloatToInteger<double> {
- using TType = ui64;
-};
-
-static_assert(std::numeric_limits<float>::is_iec559, "float type is not iec559(ieee754)");
-static_assert(std::numeric_limits<double>::is_iec559, "double type is not iec559(ieee754)");
-
-template <typename TFloat, bool Desc>
-Y_FORCE_INLINE
-void EncodeFloating(TVector<ui8>& output, TFloat value) {
- using TInteger = typename TFloatToInteger<TFloat>::TType;
- EFPCode code;
-
- switch (std::fpclassify(value)) {
- case FP_NORMAL:
- case FP_SUBNORMAL: {
- auto integer = ReadUnaligned<TInteger>(&value);
- if (value < 0) {
- integer = ~integer;
- code = EFPCode::Neg;
- } else {
- code = EFPCode::Pos;
- }
- output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code));
- EncodeUnsigned<TInteger, Desc>(output, integer);
- return;
- }
- case FP_ZERO:
- code = EFPCode::Zero;
- break;
- case FP_INFINITE:
- code = value < 0 ? EFPCode::NegInf : EFPCode::PosInf;
- break;
- default:
- code = EFPCode::Nan;
- break;
- }
- output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code));
-}
-
-template <typename TFloat, bool Desc>
-Y_FORCE_INLINE
-TFloat DecodeFloating(TStringBuf& input) {
- using TInteger = typename TFloatToInteger<TFloat>::TType;
-
- EnsureInputSize(input, 1);
- auto code = EFPCode(Desc ? 0xFF ^ input[0] : input[0]);
- input.Skip(1);
-
- bool negative;
- switch (code) {
- case EFPCode::Zero:
- return 0;
- case EFPCode::NegInf:
- return -std::numeric_limits<TFloat>::infinity();
- case EFPCode::PosInf:
- return std::numeric_limits<TFloat>::infinity();
- case EFPCode::Nan:
- return std::numeric_limits<TFloat>::quiet_NaN();
- case EFPCode::Neg:
- negative = true;
- break;
- case EFPCode::Pos:
- negative = false;
- break;
- default:
- MKQL_ENSURE(false, "floating point data is corrupted");
- }
-
- auto integer = DecodeUnsigned<TInteger, Desc>(input);
- if (negative) {
- integer = ~integer;
- }
-
- return ReadUnaligned<TFloat>(&integer);
-}
-
-constexpr ui8 BlockCode = 0x1F;
-constexpr size_t BlockSize = 15;
-constexpr size_t BlockSizeUi64 = BlockSize / 8 + 1;
-
-template <bool Desc>
-Y_FORCE_INLINE
-void EncodeString(TVector<ui8>& output, TStringBuf value) {
- size_t part = 0;
-
- while (!value.empty()) {
- union {
- ui8 buffer[BlockSize + 1];
- ui64 buffer64[BlockSizeUi64];
- };
-
- part = std::min(value.size(), BlockSize);
- if (part == BlockSize) {
- std::memcpy(buffer + 1, value.data(), BlockSize);
- } else {
- for (size_t i = 0; i < BlockSizeUi64; ++i) {
- buffer64[i] = 0;
- }
- std::memcpy(buffer + 1, value.data(), part);
- }
- value.Skip(part);
-
- buffer[0] = BlockCode;
-
- if (Desc) {
- for (size_t i = 0; i < BlockSizeUi64; ++i) {
- buffer64[i] ^= std::numeric_limits<ui64>::max();
- }
- }
-
- output.insert(output.end(), buffer, buffer + BlockSize + 1);
- }
-
- auto lastLength = ui8(part);
- output.push_back(Desc ? 0xFF ^ lastLength : lastLength);
-}
-
-template <bool Desc>
-Y_FORCE_INLINE
-TStringBuf DecodeString(TStringBuf& input, TVector<ui8>& value) {
- EnsureInputSize(input, 1);
- ui8 code = Desc ? 0xFF ^ input[0] : input[0];
- input.Skip(1);
-
- if (code != BlockCode) {
- MKQL_ENSURE(code == 0, TStringBuilder() << "unknown string block code: " << code);
- return TStringBuf();
- }
-
- while (code == BlockCode) {
- union {
- ui8 buffer[BlockSize + 1];
- ui64 buffer64[BlockSizeUi64];
- };
-
- EnsureInputSize(input, BlockSize + 1);
- std::memcpy(buffer, input.data(), BlockSize + 1);
- input.Skip(BlockSize + 1);
-
- if (Desc) {
- for (size_t i = 0; i < BlockSizeUi64; ++i) {
- buffer64[i] ^= std::numeric_limits<ui64>::max();
- }
- }
-
- value.insert(value.end(), buffer, buffer + BlockSize);
- code = buffer[BlockSize];
- }
-
- auto begin = (const char*)value.begin();
- auto end = (const char*)value.end() - BlockSize + code;
- return TStringBuf(begin, end - begin);
-}
-
constexpr size_t UuidSize = 16;
template <bool Desc>
Y_FORCE_INLINE
-void EncodeUuid(TVector<ui8>& output, const char* data) {
+ void EncodeUuid(TVector<ui8>& output, const char* data) {
output.resize(output.size() + UuidSize);
auto ptr = output.end() - UuidSize;
@@ -270,14 +27,15 @@ void EncodeUuid(TVector<ui8>& output, const char* data) {
for (size_t i = 0; i < UuidSize; ++i) {
*ptr++ = ui8(*data++) ^ 0xFF;
}
- } else {
+ }
+ else {
std::memcpy(ptr, data, UuidSize);
}
}
template <bool Desc>
Y_FORCE_INLINE
-TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) {
+ TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) {
EnsureInputSize(input, UuidSize);
auto data = input.data();
input.Skip(UuidSize);
@@ -289,7 +47,8 @@ TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) {
for (size_t i = 0; i < UuidSize; ++i) {
*ptr++ = ui8(*data++) ^ 0xFF;
}
- } else {
+ }
+ else {
std::memcpy(ptr, data, UuidSize);
}
@@ -298,7 +57,7 @@ TStringBuf DecodeUuid(TStringBuf& input, TVector<ui8>& value) {
template <typename TUnsigned, bool Desc>
Y_FORCE_INLINE
-void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) {
+ void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) {
constexpr size_t size = sizeof(TUnsigned);
if (Desc) {
@@ -313,7 +72,7 @@ void EncodeTzUnsigned(TVector<ui8>& output, TUnsigned value, ui16 tzId) {
template <typename TUnsigned, bool Desc>
Y_FORCE_INLINE
-void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) {
+ void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) {
constexpr size_t size = sizeof(TUnsigned);
EnsureInputSize(input, size + sizeof(ui16));
@@ -324,7 +83,8 @@ void DecodeTzUnsigned(TStringBuf& input, TUnsigned& value, ui16& tzId) {
if (Desc) {
value = ~SwapBytes(v);
tzId = ~SwapBytes(t);
- } else {
+ }
+ else {
value = SwapBytes(v);
tzId = SwapBytes(t);
}
@@ -334,7 +94,7 @@ constexpr size_t DecimalSize = sizeof(NYql::NDecimal::TInt128);
template <bool Desc>
Y_FORCE_INLINE
-void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) {
+ void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) {
output.resize(output.size() + DecimalSize);
auto ptr = reinterpret_cast<char*>(output.end() - DecimalSize);
output.resize(output.size() + NYql::NDecimal::Serialize(Desc ? -value : value, ptr) - DecimalSize);
@@ -342,14 +102,13 @@ void EncodeDecimal(TVector<ui8>& output, NYql::NDecimal::TInt128 value) {
template <bool Desc>
Y_FORCE_INLINE
-NYql::NDecimal::TInt128 DecodeDecimal(TStringBuf& input) {
+ NYql::NDecimal::TInt128 DecodeDecimal(TStringBuf& input) {
MKQL_ENSURE(input.size() > 0U && input.size() <= DecimalSize, "premature end of input");
const auto des = NYql::NDecimal::Deserialize(input.data());
input.Skip(des.second);
return Desc ? -des.first : des.first;
}
-
template <bool Desc>
Y_FORCE_INLINE
void Encode(TVector<ui8>& output, NUdf::EDataSlot slot, const NUdf::TUnboxedValuePod& value) {
@@ -635,6 +394,12 @@ void EncodeValue(TType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& ou
break;
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ EncodePresortPGValue(pgType, value, output);
+ break;
+ }
+
default:
MKQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
}
@@ -655,6 +420,11 @@ NUdf::TUnboxedValue DecodeImpl(TType* type, TStringBuf& input, const THolderFact
auto slot = *static_cast<TDataType*>(type)->GetDataSlot();
return Decode<false>(input, slot, buffer);
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ return DecodePresortPGValue(pgType, input, buffer);
+ }
+
case TType::EKind::Optional: {
auto itemType = static_cast<TOptionalType*>(type)->GetItemType();
auto hasValue = DecodeBool<false>(input);
diff --git a/ydb/library/yql/minikql/computation/presort_impl.h b/ydb/library/yql/minikql/computation/presort_impl.h
new file mode 100644
index 0000000000..5729cf3d24
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/presort_impl.h
@@ -0,0 +1,265 @@
+#pragma once
+#include <ydb/library/yql/minikql/defs.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/public/udf/udf_value.h>
+#include <ydb/library/yql/utils/swap_bytes.h>
+
+#include <util/generic/vector.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace NDetail {
+
+ using NYql::SwapBytes;
+
+ Y_FORCE_INLINE
+ void EnsureInputSize(TStringBuf& input, size_t size) {
+ MKQL_ENSURE(input.size() >= size, "premature end of input");
+ }
+
+ template <bool Desc>
+ Y_FORCE_INLINE
+ void EncodeBool(TVector<ui8>& output, bool value) {
+ output.push_back(Desc ? 0xFF ^ ui8(value) : ui8(value));
+ }
+
+ template <bool Desc>
+ Y_FORCE_INLINE
+ bool DecodeBool(TStringBuf& input) {
+ EnsureInputSize(input, 1);
+ auto result = Desc ? bool(0xFF ^ ui8(input[0])) : bool(input[0]);
+ input.Skip(1);
+ return result;
+ }
+
+ template <typename TUnsigned, bool Desc>
+ Y_FORCE_INLINE
+ void EncodeUnsigned(TVector<ui8>& output, TUnsigned value) {
+ constexpr size_t size = sizeof(TUnsigned);
+
+ if (Desc) {
+ value = ~value;
+ }
+
+ output.resize(output.size() + size);
+ WriteUnaligned<TUnsigned>(output.end() - size, SwapBytes(value));
+ }
+
+ template <typename TUnsigned, bool Desc>
+ Y_FORCE_INLINE
+ TUnsigned DecodeUnsigned(TStringBuf& input) {
+ constexpr size_t size = sizeof(TUnsigned);
+
+ EnsureInputSize(input, size);
+ auto value = ReadUnaligned<TUnsigned>(input.data());
+ input.Skip(size);
+
+ value = SwapBytes(value);
+ if (Desc) {
+ value = ~value;
+ }
+ return value;
+ }
+
+ template <typename TSigned, bool Desc>
+ Y_FORCE_INLINE
+ void EncodeSigned(TVector<ui8>& output, TSigned value) {
+ using TUnsigned = std::make_unsigned_t<TSigned>;
+ constexpr size_t size = sizeof(TUnsigned);
+ constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1);
+
+ EncodeUnsigned<TUnsigned, Desc>(output, TUnsigned(value) + shift);
+ }
+
+ template <typename TSigned, bool Desc>
+ Y_FORCE_INLINE
+ TSigned DecodeSigned(TStringBuf& input) {
+ using TUnsigned = std::make_unsigned_t<TSigned>;
+ constexpr size_t size = sizeof(TUnsigned);
+ constexpr TUnsigned shift = TUnsigned(1) << (size * 8 - 1);
+
+ return TSigned(DecodeUnsigned<TUnsigned, Desc>(input) - shift);
+ }
+
+ enum class EFPCode : ui8 {
+ NegInf = 0,
+ Neg = 1,
+ Zero = 2,
+ Pos = 3,
+ PosInf = 4,
+ Nan = 5
+ };
+
+ template <typename TFloat>
+ struct TFloatToInteger {};
+
+ template <>
+ struct TFloatToInteger<float> {
+ using TType = ui32;
+ };
+
+ template <>
+ struct TFloatToInteger<double> {
+ using TType = ui64;
+ };
+
+ static_assert(std::numeric_limits<float>::is_iec559, "float type is not iec559(ieee754)");
+ static_assert(std::numeric_limits<double>::is_iec559, "double type is not iec559(ieee754)");
+
+ template <typename TFloat, bool Desc>
+ Y_FORCE_INLINE
+ void EncodeFloating(TVector<ui8>& output, TFloat value) {
+ using TInteger = typename TFloatToInteger<TFloat>::TType;
+ EFPCode code;
+
+ switch (std::fpclassify(value)) {
+ case FP_NORMAL:
+ case FP_SUBNORMAL: {
+ auto integer = ReadUnaligned<TInteger>(&value);
+ if (value < 0) {
+ integer = ~integer;
+ code = EFPCode::Neg;
+ }
+ else {
+ code = EFPCode::Pos;
+ }
+ output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code));
+ EncodeUnsigned<TInteger, Desc>(output, integer);
+ return;
+ }
+ case FP_ZERO:
+ code = EFPCode::Zero;
+ break;
+ case FP_INFINITE:
+ code = value < 0 ? EFPCode::NegInf : EFPCode::PosInf;
+ break;
+ default:
+ code = EFPCode::Nan;
+ break;
+ }
+ output.push_back(Desc ? 0xFF ^ ui8(code) : ui8(code));
+ }
+
+ template <typename TFloat, bool Desc>
+ Y_FORCE_INLINE
+ TFloat DecodeFloating(TStringBuf& input) {
+ using TInteger = typename TFloatToInteger<TFloat>::TType;
+
+ EnsureInputSize(input, 1);
+ auto code = EFPCode(Desc ? 0xFF ^ input[0] : input[0]);
+ input.Skip(1);
+
+ bool negative;
+ switch (code) {
+ case EFPCode::Zero:
+ return 0;
+ case EFPCode::NegInf:
+ return -std::numeric_limits<TFloat>::infinity();
+ case EFPCode::PosInf:
+ return std::numeric_limits<TFloat>::infinity();
+ case EFPCode::Nan:
+ return std::numeric_limits<TFloat>::quiet_NaN();
+ case EFPCode::Neg:
+ negative = true;
+ break;
+ case EFPCode::Pos:
+ negative = false;
+ break;
+ default:
+ MKQL_ENSURE(false, "floating point data is corrupted");
+ }
+
+ auto integer = DecodeUnsigned<TInteger, Desc>(input);
+ if (negative) {
+ integer = ~integer;
+ }
+
+ return ReadUnaligned<TFloat>(&integer);
+ }
+
+ constexpr ui8 BlockCode = 0x1F;
+ constexpr size_t BlockSize = 15;
+ constexpr size_t BlockSizeUi64 = BlockSize / 8 + 1;
+
+ template <bool Desc>
+ Y_FORCE_INLINE
+ void EncodeString(TVector<ui8>& output, TStringBuf value) {
+ size_t part = 0;
+
+ while (!value.empty()) {
+ union {
+ ui8 buffer[BlockSize + 1];
+ ui64 buffer64[BlockSizeUi64];
+ };
+
+ part = std::min(value.size(), BlockSize);
+ if (part == BlockSize) {
+ std::memcpy(buffer + 1, value.data(), BlockSize);
+ }
+ else {
+ for (size_t i = 0; i < BlockSizeUi64; ++i) {
+ buffer64[i] = 0;
+ }
+ std::memcpy(buffer + 1, value.data(), part);
+ }
+ value.Skip(part);
+
+ buffer[0] = BlockCode;
+
+ if (Desc) {
+ for (size_t i = 0; i < BlockSizeUi64; ++i) {
+ buffer64[i] ^= std::numeric_limits<ui64>::max();
+ }
+ }
+
+ output.insert(output.end(), buffer, buffer + BlockSize + 1);
+ }
+
+ auto lastLength = ui8(part);
+ output.push_back(Desc ? 0xFF ^ lastLength : lastLength);
+ }
+
+ template <bool Desc>
+ Y_FORCE_INLINE
+ TStringBuf DecodeString(TStringBuf& input, TVector<ui8>& value) {
+ EnsureInputSize(input, 1);
+ ui8 code = Desc ? 0xFF ^ input[0] : input[0];
+ input.Skip(1);
+
+ if (code != BlockCode) {
+ MKQL_ENSURE(code == 0, TStringBuilder() << "unknown string block code: " << code);
+ return TStringBuf();
+ }
+
+ while (code == BlockCode) {
+ union {
+ ui8 buffer[BlockSize + 1];
+ ui64 buffer64[BlockSizeUi64];
+ };
+
+ EnsureInputSize(input, BlockSize + 1);
+ std::memcpy(buffer, input.data(), BlockSize + 1);
+ input.Skip(BlockSize + 1);
+
+ if (Desc) {
+ for (size_t i = 0; i < BlockSizeUi64; ++i) {
+ buffer64[i] ^= std::numeric_limits<ui64>::max();
+ }
+ }
+
+ value.insert(value.end(), buffer, buffer + BlockSize);
+ code = buffer[BlockSize];
+ }
+
+ auto begin = (const char*)value.begin();
+ auto end = (const char*)value.end() - BlockSize + code;
+ return TStringBuf(begin, end - begin);
+ }
+}
+
+void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output);
+NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer);
+
+} // NMiniKQL
+} // NKikimr
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index a4870bf022..6390675026 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -2,6 +2,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h>
+#include <ydb/library/yql/minikql/computation/presort_impl.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
@@ -1621,6 +1622,106 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
}
}
+void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output) {
+ switch (type->GetTypeId()) {
+ case BOOLOID: {
+ const auto x = DatumGetBool(ScalarDatumFromPod(value)) != 0;
+ NDetail::EncodeBool<false>(output, x);
+ break;
+ }
+ case INT2OID: {
+ const auto x = DatumGetInt16(ScalarDatumFromPod(value));
+ NDetail::EncodeSigned<i16, false>(output, x);
+ break;
+ }
+ case INT4OID: {
+ const auto x = DatumGetInt32(ScalarDatumFromPod(value));
+ NDetail::EncodeSigned<i32, false>(output, x);
+ break;
+ }
+ case INT8OID: {
+ const auto x = DatumGetInt64(ScalarDatumFromPod(value));
+ NDetail::EncodeSigned<i64, false>(output, x);
+ break;
+ }
+ case FLOAT4OID: {
+ const auto x = DatumGetFloat4(ScalarDatumFromPod(value));
+ NDetail::EncodeFloating<float, false>(output, x);
+ break;
+ }
+ case FLOAT8OID: {
+ const auto x = DatumGetFloat8(ScalarDatumFromPod(value));
+ NDetail::EncodeFloating<double, false>(output, x);
+ break;
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocScope call;
+ const auto x = (const text*)PointerDatumFromPod(value, true);
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ TString ret;
+ if (len) {
+ ret = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, ret.begin(), len + 1);
+ }
+
+ NDetail::EncodeString<false>(output, ret);
+ break;
+ }
+ case CSTRINGOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocScope call;
+ const auto x = (const char*)PointerDatumFromPod(value, false);
+ NDetail::EncodeString<false>(output, x);
+ break;
+ }
+ default:
+ SET_MEMORY_CONTEXT;
+ TPAllocScope call;
+ const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(typeInfo.SendFuncId);
+ fmgr_info(typeInfo.SendFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs == 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value, typeInfo.TypeLen == -1), false };
+ auto x = (text*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_DEFER {
+ pfree(x);
+ };
+
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ TString ret;
+ if (len) {
+ ret = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, ret.begin(), len + 1);
+ }
+
+ NDetail::EncodeString<false>(output, ret);
+ }
+}
+
+NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer) {
+ // presort decoding is used only inside RTMR, don't implement it right now
+ Y_UNUSED(type);
+ Y_UNUSED(input);
+ Y_UNUSED(buffer);
+ throw yexception() << "PG types are not supported";
+}
+
void* PgInitializeContext(const std::string_view& contextType) {
if (contextType == "Agg") {
auto ctx = (AggState*)MKQLAllocWithSize(sizeof(AggState));
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index ae594de09d..a8887ee935 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -1,6 +1,7 @@
#include <ydb/library/yql/sql/pg_sql.h>
#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h>
+#include <ydb/library/yql/minikql/computation/presort_impl.h>
#include <ydb/library/yql/core/yql_pg_utils.h>
namespace NSQLTranslationPG {
@@ -87,6 +88,20 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
throw yexception() << "PG types are not supported";
}
+void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output) {
+ Y_UNUSED(type);
+ Y_UNUSED(value);
+ Y_UNUSED(output);
+ throw yexception() << "PG types are not supported";
+}
+
+NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer) {
+ Y_UNUSED(type);
+ Y_UNUSED(input);
+ Y_UNUSED(buffer);
+ throw yexception() << "PG types are not supported";
+}
+
void* PgInitializeContext(const std::string_view& contextType) {
Y_UNUSED(contextType);
return nullptr;