aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Uzhakov <uzhastik@gmail.com>2022-06-20 22:36:13 +0300
committerSergey Uzhakov <uzhastik@gmail.com>2022-06-20 22:36:13 +0300
commit1940182aa37a8912f9609df6c24dbc64a05dbc53 (patch)
treef7c575ff9a7bf4956b295645972ae64c5a9c9e6f
parent8ba0b67c1c25bc31d37230f0a859c5909c6bc8cd (diff)
downloadydb-1940182aa37a8912f9609df6c24dbc64a05dbc53.tar.gz
YQ-1154: add pg native format conversion functions (text + binary formats)
ref:f4e60a78debf697c999b89054829bdbbc891e010
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp600
-rw-r--r--ydb/library/yql/providers/common/codec/yql_pg_codec.h6
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp24
3 files changed, 249 insertions, 381 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 3cd4591f8c..c84aec2e54 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -59,6 +59,7 @@ namespace NYql {
using namespace NKikimr::NMiniKQL;
+// use 'false' for native format
static __thread bool NeedCanonizeFp = false;
struct TMainContext {
@@ -149,7 +150,7 @@ Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value) {
void *MkqlAllocSetAlloc(MemoryContext context, Size size) {
auto fullSize = size + PallocHdrSize;
- auto ptr = (char *)NKikimr::NMiniKQL::MKQLAllocDeprecated(fullSize);
+ auto ptr = (char *)MKQLAllocDeprecated(fullSize);
auto ret = (void*)(ptr + PallocHdrSize);
*(MemoryContext *)(((char *)ret) - sizeof(void *)) = context;
((TAllocState::TListEntry*)ptr)->Link(TlsAllocState->CurrentPAllocList);
@@ -1502,6 +1503,124 @@ TComputationNodeFactory GetPgFactory() {
namespace NCommon {
+TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
+ YQL_ENSURE(value); // null could not be represented as text
+
+ TPAllocScope call;
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto outFuncId = typeInfo.OutFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ outFuncId = NPg::LookupProc("array_out", { 0 }).ProcId;
+ }
+
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(outFuncId);
+ fmgr_info(outFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs == 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value), false };
+ auto str = (char*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_DEFER{
+ pfree(str);
+ };
+
+ return TString(str);
+}
+
+template <typename F>
+void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId, bool needCanonizeFp, F f) {
+ YQL_ENSURE(value); // null could not be represented as binary
+
+ const bool oldNeedCanonizeFp = NeedCanonizeFp;
+ NeedCanonizeFp = needCanonizeFp;
+ Y_DEFER{
+ NeedCanonizeFp = oldNeedCanonizeFp;
+ };
+
+ TPAllocScope call;
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto sendFuncId = typeInfo.SendFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId;
+ }
+
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(sendFuncId);
+ fmgr_info(sendFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs == 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value), false };
+
+ auto x = (text*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_DEFER{
+ pfree(x);
+ };
+
+ auto s = GetVarBuf(x);
+ ui32 len = s.Size();
+ f(TStringBuf(s.Data(), s.Size()));
+}
+
+TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
+ TString result;
+ PgValueToNativeBinaryImpl(value, pgTypeId, false, [&result](TStringBuf b) {
+ result = b;
+ });
+ return result;
+}
+
+TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
+ YQL_ENSURE(value); // null could not be represented as text
+
+ switch (pgTypeId) {
+ case BOOLOID:
+ return DatumGetBool(ScalarDatumFromPod(value)) ? "true" : "false";
+ case INT2OID:
+ return ToString(DatumGetInt16(ScalarDatumFromPod(value)));
+ case INT4OID:
+ return ToString(DatumGetInt32(ScalarDatumFromPod(value)));
+ case INT8OID:
+ return ToString(DatumGetInt64(ScalarDatumFromPod(value)));
+ case FLOAT4OID:
+ return ::FloatToString(DatumGetFloat4(ScalarDatumFromPod(value)));
+ case FLOAT8OID:
+ return ::FloatToString(DatumGetFloat8(ScalarDatumFromPod(value)));
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ const auto x = (const text*)PointerDatumFromPod(value);
+ return TString(GetVarBuf(x));
+ }
+ case CSTRINGOID: {
+ return TString((const char*)PointerDatumFromPod(value));
+ }
+ default:
+ return PgValueToNativeText(value, pgTypeId);
+ }
+}
+
void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::TUnboxedValuePod& value) {
using namespace NYson::NDetail;
if (!value) {
@@ -1556,117 +1675,16 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T
break;
}
default:
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto sendFuncId = typeInfo.SendFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(sendFuncId);
- fmgr_info(sendFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs == 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { typeInfo.PassByValue ?
- ScalarDatumFromPod(value):
- PointerDatumFromPod(value), false };
- NeedCanonizeFp = true;
- auto x = (text*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_DEFER {
- pfree(x);
- NeedCanonizeFp = false;
- };
-
- auto s = GetVarBuf(x);
buf.Write(StringMarker);
- buf.WriteVarI32(s.Size());
- buf.WriteMany(s.Data(), s.Size());
+ PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&buf](TStringBuf b) {
+ buf.WriteVarI32(b.Size());
+ buf.WriteMany(b.Data(), b.Size());
+ });
break;
}
}
-TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
- YQL_ENSURE(value);
-
- TString ret;
- switch (pgTypeId) {
- case BOOLOID:
- ret = DatumGetBool(ScalarDatumFromPod(value)) ? "true" : "false";
- break;
- case INT2OID:
- ret = ToString(DatumGetInt16(ScalarDatumFromPod(value)));
- break;
- case INT4OID:
- ret = ToString(DatumGetInt32(ScalarDatumFromPod(value)));
- break;
- case INT8OID:
- ret = ToString(DatumGetInt64(ScalarDatumFromPod(value)));
- break;
- case FLOAT4OID:
- ret = ::FloatToString(DatumGetFloat4(ScalarDatumFromPod(value)));
- break;
- case FLOAT8OID:
- ret = ::FloatToString(DatumGetFloat8(ScalarDatumFromPod(value)));
- break;
- case BYTEAOID:
- case VARCHAROID:
- case TEXTOID: {
- const auto x = (const text*)PointerDatumFromPod(value);
- ret = GetVarBuf(x);
- break;
- }
- case CSTRINGOID: {
- auto str = (const char*)PointerDatumFromPod(value);
- ret = str;
- break;
- }
- default:
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(pgTypeId);
- auto outFuncId = typeInfo.OutFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- outFuncId = NPg::LookupProc("array_out", { 0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(outFuncId);
- fmgr_info(outFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs == 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { typeInfo.PassByValue ?
- ScalarDatumFromPod(value) :
- PointerDatumFromPod(value), false };
- auto str = (char*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_DEFER{
- pfree(str);
- };
-
- ret = str;
- }
-
- return ret;
-}
-
-void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
+void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, TPgType* type,
const TVector<ui32>* structPositions) {
if (!value) {
writer.OnNull();
@@ -1731,41 +1749,79 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput
default:
TPAllocScope call;
auto s = buf.ReadYtString();
- StringInfoData stringInfo;
- stringInfo.data = (char*)s.Data();
- stringInfo.len = s.Size();
- stringInfo.maxlen = s.Size();
- stringInfo.cursor = 0;
-
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto receiveFuncId = typeInfo.ReceiveFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
- }
+ return PgValueFromNativeBinary(s, type->GetTypeId());
+ }
+}
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(receiveFuncId);
- fmgr_info(receiveFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)&stringInfo, false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
+NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) {
+ TPAllocScope call;
+ StringInfoData stringInfo;
+ stringInfo.data = (char*)binary.Data();
+ stringInfo.len = binary.Size();
+ stringInfo.maxlen = binary.Size();
+ stringInfo.cursor = 0;
+
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ auto receiveFuncId = typeInfo.ReceiveFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
+ }
+
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(receiveFuncId);
+ fmgr_info(receiveFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)&stringInfo, false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_ENSURE(stringInfo.cursor == stringInfo.len);
+ return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+}
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_ENSURE(stringInfo.cursor == stringInfo.len);
- return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
- }
+NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) {
+ TString str{ text };
+
+ TPAllocScope call;
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ auto inFuncId = typeInfo.InFuncId;
+ if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
+ inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId;
+ }
+
+ FmgrInfo finfo;
+ Zero(finfo);
+ Y_ENSURE(inFuncId);
+ fmgr_info(inFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)str.c_str(), false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
}
NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) {
@@ -1799,36 +1855,7 @@ NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) {
return PointerDatumToPod((Datum)ret);
}
default:
- TString str{ s };
-
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(pgTypeId);
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto inFuncId = typeInfo.InFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(inFuncId);
- fmgr_info(inFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)str.c_str(), false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ return PgValueFromNativeText(s, pgTypeId);
}
}
@@ -1843,7 +1870,7 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
return PgValueFromString(s, type->GetTypeId());
}
-NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) {
+NUdf::TUnboxedValue ReadSkiffPg(TPgType* type, NCommon::TInputBuf& buf) {
auto marker = buf.Read();
if (!marker) {
return NUdf::TUnboxedValue();
@@ -1929,45 +1956,11 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm
};
buf.ReadMany(s, size);
-
- StringInfoData stringInfo;
- stringInfo.data = s;
- stringInfo.len = size;
- stringInfo.maxlen = size;
- stringInfo.cursor = 0;
-
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto receiveFuncId = typeInfo.ReceiveFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(receiveFuncId);
- fmgr_info(receiveFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)&stringInfo, false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_ENSURE(stringInfo.cursor == stringInfo.len);
- return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ return PgValueFromNativeBinary(TStringBuf(s, size), type->GetTypeId());
}
}
-void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+void WriteSkiffPg(TPgType* type, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
if (!value) {
buf.Write('\0');
return;
@@ -2023,49 +2016,19 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe
break;
}
default:
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto sendFuncId = typeInfo.SendFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(sendFuncId);
- fmgr_info(sendFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs == 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { typeInfo.PassByValue ?
- ScalarDatumFromPod(value) :
- PointerDatumFromPod(value), false };
- NeedCanonizeFp = true;
- auto x = (text*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_DEFER {
- pfree(x);
- NeedCanonizeFp = false;
- };
-
- auto s = GetVarBuf(x);
- ui32 len = s.Size();
- buf.WriteMany((const char*)&len, sizeof(len));
- buf.WriteMany(s.Data(), len);
+ PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&buf](TStringBuf b) {
+ ui32 len = b.Size();
+ buf.WriteMany((const char*)&len, sizeof(len));
+ buf.WriteMany(b.Data(), len);
+ });
}
}
-extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) {
+extern "C" void ReadSkiffPgValue(TPgType* type, NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) {
value = ReadSkiffPg(type, buf);
}
-extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
+extern "C" void WriteSkiffPgValue(TPgType* type, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
WriteSkiffPg(type, value, buf);
}
@@ -2216,40 +2179,10 @@ void PGPackImpl(bool stable, const TPgType* type, const NUdf::TUnboxedValuePod&
break;
}
default:
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto sendFuncId = typeInfo.SendFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(sendFuncId);
- fmgr_info(sendFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs == 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { typeInfo.PassByValue ?
- ScalarDatumFromPod(value) :
- PointerDatumFromPod(value), false };
- NeedCanonizeFp = stable;
- auto x = (text*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_DEFER{
- pfree(x);
- NeedCanonizeFp = false;
- };
-
- auto s = GetVarBuf(x);
- NDetails::PackUInt32(s.Size(), buf);
- buf.Append(s.Data(), s.Size());
+ NYql::NCommon::PgValueToNativeBinaryImpl(value, type->GetTypeId(), stable, [&buf](TStringBuf b) {
+ NDetails::PackUInt32(b.Size(), buf);
+ buf.Append(b.Data(), b.Size());
+ });
}
}
@@ -2301,40 +2234,9 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
TPAllocScope call;
auto size = NDetails::UnpackUInt32(buf);
MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small");
- StringInfoData stringInfo;
- stringInfo.data = (char*)buf.data();
- stringInfo.len = size;
- stringInfo.maxlen = size;
- stringInfo.cursor = 0;
-
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto receiveFuncId = typeInfo.ReceiveFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(receiveFuncId);
- fmgr_info(receiveFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)&stringInfo, false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- buf.Skip(stringInfo.cursor);
- return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ TStringBuf s = buf.Head(size);
+ buf.Skip(size);
+ return NYql::NCommon::PgValueFromNativeBinary(s, type->GetTypeId());
}
}
@@ -2384,39 +2286,9 @@ void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVect
break;
}
default:
- TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto sendFuncId = typeInfo.SendFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(sendFuncId);
- fmgr_info(sendFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs == 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { typeInfo.PassByValue ?
- ScalarDatumFromPod(value) :
- PointerDatumFromPod(value), false };
- NeedCanonizeFp = true;
- auto x = (text*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_DEFER {
- pfree(x);
- NeedCanonizeFp = false;
- };
-
- auto s = GetVarBuf(x);
- NDetail::EncodeString<false>(output, s);
+ NYql::NCommon::PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&output](TStringBuf b) {
+ NDetail::EncodeString<false>(output, b);
+ });
}
}
@@ -2463,41 +2335,7 @@ NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVect
default:
buffer.clear();
const auto s = NDetail::DecodeString<false>(input, buffer);
-
- StringInfoData stringInfo;
- stringInfo.data = (char*)s.Data();
- stringInfo.len = s.Size();
- stringInfo.maxlen = s.Size();
- stringInfo.cursor = 0;
-
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- auto typeIOParam = MakeTypeIOParam(typeInfo);
- auto receiveFuncId = typeInfo.ReceiveFuncId;
- if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
- receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
- }
-
- FmgrInfo finfo;
- Zero(finfo);
- Y_ENSURE(receiveFuncId);
- fmgr_info(receiveFuncId, &finfo);
- Y_ENSURE(!finfo.fn_retset);
- Y_ENSURE(finfo.fn_addr);
- Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)&stringInfo, false };
- callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto x = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- Y_ENSURE(stringInfo.cursor == stringInfo.len);
- return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ return NYql::NCommon::PgValueFromNativeBinary(s, type->GetTypeId());
}
}
@@ -2726,7 +2564,7 @@ private:
FmgrInfo FInfoEquate;
};
-NUdf::IEquate::TPtr MakePgEquate(const NMiniKQL::TPgType* type) {
+NUdf::IEquate::TPtr MakePgEquate(const TPgType* type) {
return new TPgEquate(type);
}
diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
index aed53d4faf..a943fbe0b6 100644
--- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h
+++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
@@ -15,6 +15,12 @@ namespace NCommon {
TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId);
NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId);
+TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId);
+NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId);
+
+TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId);
+NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId);
+
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
const TVector<ui32>* structPositions);
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 514f02f330..6be057fc6d 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -32,6 +32,30 @@ NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId) {
throw yexception() << "PgValueFromString: PG types are not supported";
}
+TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
+ Y_UNUSED(value);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PgValueToNativeText: PG types are not supported";
+}
+
+NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) {
+ Y_UNUSED(text);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PgValueFromNativeText: PG types are not supported";
+}
+
+TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
+ Y_UNUSED(value);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PgValueToNativeBinary: PG types are not supported";
+}
+
+NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) {
+ Y_UNUSED(binary);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PgValueFromNativeBinary: PG types are not supported";
+}
+
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
const TVector<ui32>* structPositions) {
Y_UNUSED(writer);