diff options
author | Sergey Uzhakov <uzhastik@gmail.com> | 2022-06-20 22:36:13 +0300 |
---|---|---|
committer | Sergey Uzhakov <uzhastik@gmail.com> | 2022-06-20 22:36:13 +0300 |
commit | 1940182aa37a8912f9609df6c24dbc64a05dbc53 (patch) | |
tree | f7c575ff9a7bf4956b295645972ae64c5a9c9e6f | |
parent | 8ba0b67c1c25bc31d37230f0a859c5909c6bc8cd (diff) | |
download | ydb-1940182aa37a8912f9609df6c24dbc64a05dbc53.tar.gz |
YQ-1154: add pg native format conversion functions (text + binary formats)
ref:f4e60a78debf697c999b89054829bdbbc891e010
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 600 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/codec/yql_pg_codec.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 24 |
3 files changed, 249 insertions, 381 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 3cd4591f8c..c84aec2e54 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -59,6 +59,7 @@ namespace NYql { using namespace NKikimr::NMiniKQL; +// use 'false' for native format static __thread bool NeedCanonizeFp = false; struct TMainContext { @@ -149,7 +150,7 @@ Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value) { void *MkqlAllocSetAlloc(MemoryContext context, Size size) { auto fullSize = size + PallocHdrSize; - auto ptr = (char *)NKikimr::NMiniKQL::MKQLAllocDeprecated(fullSize); + auto ptr = (char *)MKQLAllocDeprecated(fullSize); auto ret = (void*)(ptr + PallocHdrSize); *(MemoryContext *)(((char *)ret) - sizeof(void *)) = context; ((TAllocState::TListEntry*)ptr)->Link(TlsAllocState->CurrentPAllocList); @@ -1502,6 +1503,124 @@ TComputationNodeFactory GetPgFactory() { namespace NCommon { +TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { + YQL_ENSURE(value); // null could not be represented as text + + TPAllocScope call; + const auto& typeInfo = NPg::LookupType(pgTypeId); + auto outFuncId = typeInfo.OutFuncId; + if (typeInfo.TypeId == typeInfo.ArrayTypeId) { + outFuncId = NPg::LookupProc("array_out", { 0 }).ProcId; + } + + FmgrInfo finfo; + Zero(finfo); + Y_ENSURE(outFuncId); + fmgr_info(outFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs == 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value), false }; + auto str = (char*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_DEFER{ + pfree(str); + }; + + return TString(str); +} + +template <typename F> +void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId, bool needCanonizeFp, F f) { + YQL_ENSURE(value); // null could not be represented as binary + + const bool oldNeedCanonizeFp = NeedCanonizeFp; + NeedCanonizeFp = needCanonizeFp; + Y_DEFER{ + NeedCanonizeFp = oldNeedCanonizeFp; + }; + + TPAllocScope call; + const auto& typeInfo = NPg::LookupType(pgTypeId); + auto sendFuncId = typeInfo.SendFuncId; + if (typeInfo.TypeId == typeInfo.ArrayTypeId) { + sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId; + } + + FmgrInfo finfo; + Zero(finfo); + Y_ENSURE(sendFuncId); + fmgr_info(sendFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs == 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value), false }; + + auto x = (text*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_DEFER{ + pfree(x); + }; + + auto s = GetVarBuf(x); + ui32 len = s.Size(); + f(TStringBuf(s.Data(), s.Size())); +} + +TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { + TString result; + PgValueToNativeBinaryImpl(value, pgTypeId, false, [&result](TStringBuf b) { + result = b; + }); + return result; +} + +TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { + YQL_ENSURE(value); // null could not be represented as text + + switch (pgTypeId) { + case BOOLOID: + return DatumGetBool(ScalarDatumFromPod(value)) ? "true" : "false"; + case INT2OID: + return ToString(DatumGetInt16(ScalarDatumFromPod(value))); + case INT4OID: + return ToString(DatumGetInt32(ScalarDatumFromPod(value))); + case INT8OID: + return ToString(DatumGetInt64(ScalarDatumFromPod(value))); + case FLOAT4OID: + return ::FloatToString(DatumGetFloat4(ScalarDatumFromPod(value))); + case FLOAT8OID: + return ::FloatToString(DatumGetFloat8(ScalarDatumFromPod(value))); + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + const auto x = (const text*)PointerDatumFromPod(value); + return TString(GetVarBuf(x)); + } + case CSTRINGOID: { + return TString((const char*)PointerDatumFromPod(value)); + } + default: + return PgValueToNativeText(value, pgTypeId); + } +} + void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::TUnboxedValuePod& value) { using namespace NYson::NDetail; if (!value) { @@ -1556,117 +1675,16 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T break; } default: - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto sendFuncId = typeInfo.SendFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(sendFuncId); - fmgr_info(sendFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs == 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { typeInfo.PassByValue ? - ScalarDatumFromPod(value): - PointerDatumFromPod(value), false }; - NeedCanonizeFp = true; - auto x = (text*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_DEFER { - pfree(x); - NeedCanonizeFp = false; - }; - - auto s = GetVarBuf(x); buf.Write(StringMarker); - buf.WriteVarI32(s.Size()); - buf.WriteMany(s.Data(), s.Size()); + PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&buf](TStringBuf b) { + buf.WriteVarI32(b.Size()); + buf.WriteMany(b.Data(), b.Size()); + }); break; } } -TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { - YQL_ENSURE(value); - - TString ret; - switch (pgTypeId) { - case BOOLOID: - ret = DatumGetBool(ScalarDatumFromPod(value)) ? "true" : "false"; - break; - case INT2OID: - ret = ToString(DatumGetInt16(ScalarDatumFromPod(value))); - break; - case INT4OID: - ret = ToString(DatumGetInt32(ScalarDatumFromPod(value))); - break; - case INT8OID: - ret = ToString(DatumGetInt64(ScalarDatumFromPod(value))); - break; - case FLOAT4OID: - ret = ::FloatToString(DatumGetFloat4(ScalarDatumFromPod(value))); - break; - case FLOAT8OID: - ret = ::FloatToString(DatumGetFloat8(ScalarDatumFromPod(value))); - break; - case BYTEAOID: - case VARCHAROID: - case TEXTOID: { - const auto x = (const text*)PointerDatumFromPod(value); - ret = GetVarBuf(x); - break; - } - case CSTRINGOID: { - auto str = (const char*)PointerDatumFromPod(value); - ret = str; - break; - } - default: - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(pgTypeId); - auto outFuncId = typeInfo.OutFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - outFuncId = NPg::LookupProc("array_out", { 0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(outFuncId); - fmgr_info(outFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs == 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { typeInfo.PassByValue ? - ScalarDatumFromPod(value) : - PointerDatumFromPod(value), false }; - auto str = (char*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_DEFER{ - pfree(str); - }; - - ret = str; - } - - return ret; -} - -void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, +void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, TPgType* type, const TVector<ui32>* structPositions) { if (!value) { writer.OnNull(); @@ -1731,41 +1749,79 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput default: TPAllocScope call; auto s = buf.ReadYtString(); - StringInfoData stringInfo; - stringInfo.data = (char*)s.Data(); - stringInfo.len = s.Size(); - stringInfo.maxlen = s.Size(); - stringInfo.cursor = 0; - - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto typeIOParam = MakeTypeIOParam(typeInfo); - auto receiveFuncId = typeInfo.ReceiveFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; - } + return PgValueFromNativeBinary(s, type->GetTypeId()); + } +} - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(receiveFuncId); - fmgr_info(receiveFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)&stringInfo, false }; - callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; +NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) { + TPAllocScope call; + StringInfoData stringInfo; + stringInfo.data = (char*)binary.Data(); + stringInfo.len = binary.Size(); + stringInfo.maxlen = binary.Size(); + stringInfo.cursor = 0; + + const auto& typeInfo = NPg::LookupType(pgTypeId); + auto typeIOParam = MakeTypeIOParam(typeInfo); + auto receiveFuncId = typeInfo.ReceiveFuncId; + if (typeInfo.TypeId == typeInfo.ArrayTypeId) { + receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; + } + + FmgrInfo finfo; + Zero(finfo); + Y_ENSURE(receiveFuncId); + fmgr_info(receiveFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); + LOCAL_FCINFO(callInfo, 3); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 3; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { (Datum)&stringInfo, false }; + callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; + callInfo->args[2] = { Int32GetDatum(-1), false }; + + auto x = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_ENSURE(stringInfo.cursor == stringInfo.len); + return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); +} - auto x = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_ENSURE(stringInfo.cursor == stringInfo.len); - return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); - } +NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) { + TString str{ text }; + + TPAllocScope call; + const auto& typeInfo = NPg::LookupType(pgTypeId); + auto typeIOParam = MakeTypeIOParam(typeInfo); + auto inFuncId = typeInfo.InFuncId; + if (typeInfo.TypeId == typeInfo.ArrayTypeId) { + inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId; + } + + FmgrInfo finfo; + Zero(finfo); + Y_ENSURE(inFuncId); + fmgr_info(inFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); + LOCAL_FCINFO(callInfo, 3); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 3; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { (Datum)str.c_str(), false }; + callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; + callInfo->args[2] = { Int32GetDatum(-1), false }; + + auto x = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); } NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) { @@ -1799,36 +1855,7 @@ NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) { return PointerDatumToPod((Datum)ret); } default: - TString str{ s }; - - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(pgTypeId); - auto typeIOParam = MakeTypeIOParam(typeInfo); - auto inFuncId = typeInfo.InFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(inFuncId); - fmgr_info(inFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)str.c_str(), false }; - callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; - - auto x = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + return PgValueFromNativeText(s, pgTypeId); } } @@ -1843,7 +1870,7 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { return PgValueFromString(s, type->GetTypeId()); } -NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) { +NUdf::TUnboxedValue ReadSkiffPg(TPgType* type, NCommon::TInputBuf& buf) { auto marker = buf.Read(); if (!marker) { return NUdf::TUnboxedValue(); @@ -1929,45 +1956,11 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm }; buf.ReadMany(s, size); - - StringInfoData stringInfo; - stringInfo.data = s; - stringInfo.len = size; - stringInfo.maxlen = size; - stringInfo.cursor = 0; - - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto typeIOParam = MakeTypeIOParam(typeInfo); - auto receiveFuncId = typeInfo.ReceiveFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(receiveFuncId); - fmgr_info(receiveFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)&stringInfo, false }; - callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; - - auto x = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_ENSURE(stringInfo.cursor == stringInfo.len); - return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + return PgValueFromNativeBinary(TStringBuf(s, size), type->GetTypeId()); } } -void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { +void WriteSkiffPg(TPgType* type, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { if (!value) { buf.Write('\0'); return; @@ -2023,49 +2016,19 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe break; } default: - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto sendFuncId = typeInfo.SendFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(sendFuncId); - fmgr_info(sendFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs == 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { typeInfo.PassByValue ? - ScalarDatumFromPod(value) : - PointerDatumFromPod(value), false }; - NeedCanonizeFp = true; - auto x = (text*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_DEFER { - pfree(x); - NeedCanonizeFp = false; - }; - - auto s = GetVarBuf(x); - ui32 len = s.Size(); - buf.WriteMany((const char*)&len, sizeof(len)); - buf.WriteMany(s.Data(), len); + PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&buf](TStringBuf b) { + ui32 len = b.Size(); + buf.WriteMany((const char*)&len, sizeof(len)); + buf.WriteMany(b.Data(), len); + }); } } -extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) { +extern "C" void ReadSkiffPgValue(TPgType* type, NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) { value = ReadSkiffPg(type, buf); } -extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { +extern "C" void WriteSkiffPgValue(TPgType* type, const NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { WriteSkiffPg(type, value, buf); } @@ -2216,40 +2179,10 @@ void PGPackImpl(bool stable, const TPgType* type, const NUdf::TUnboxedValuePod& break; } default: - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto sendFuncId = typeInfo.SendFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(sendFuncId); - fmgr_info(sendFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs == 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { typeInfo.PassByValue ? - ScalarDatumFromPod(value) : - PointerDatumFromPod(value), false }; - NeedCanonizeFp = stable; - auto x = (text*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_DEFER{ - pfree(x); - NeedCanonizeFp = false; - }; - - auto s = GetVarBuf(x); - NDetails::PackUInt32(s.Size(), buf); - buf.Append(s.Data(), s.Size()); + NYql::NCommon::PgValueToNativeBinaryImpl(value, type->GetTypeId(), stable, [&buf](TStringBuf b) { + NDetails::PackUInt32(b.Size(), buf); + buf.Append(b.Data(), b.Size()); + }); } } @@ -2301,40 +2234,9 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { TPAllocScope call; auto size = NDetails::UnpackUInt32(buf); MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small"); - StringInfoData stringInfo; - stringInfo.data = (char*)buf.data(); - stringInfo.len = size; - stringInfo.maxlen = size; - stringInfo.cursor = 0; - - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto typeIOParam = MakeTypeIOParam(typeInfo); - auto receiveFuncId = typeInfo.ReceiveFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(receiveFuncId); - fmgr_info(receiveFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)&stringInfo, false }; - callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; - - auto x = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - buf.Skip(stringInfo.cursor); - return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + TStringBuf s = buf.Head(size); + buf.Skip(size); + return NYql::NCommon::PgValueFromNativeBinary(s, type->GetTypeId()); } } @@ -2384,39 +2286,9 @@ void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVect break; } default: - TPAllocScope call; - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto sendFuncId = typeInfo.SendFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - sendFuncId = NPg::LookupProc("array_send", { 0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(sendFuncId); - fmgr_info(sendFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs == 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { typeInfo.PassByValue ? - ScalarDatumFromPod(value) : - PointerDatumFromPod(value), false }; - NeedCanonizeFp = true; - auto x = (text*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_DEFER { - pfree(x); - NeedCanonizeFp = false; - }; - - auto s = GetVarBuf(x); - NDetail::EncodeString<false>(output, s); + NYql::NCommon::PgValueToNativeBinaryImpl(value, type->GetTypeId(), true, [&output](TStringBuf b) { + NDetail::EncodeString<false>(output, b); + }); } } @@ -2463,41 +2335,7 @@ NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVect default: buffer.clear(); const auto s = NDetail::DecodeString<false>(input, buffer); - - StringInfoData stringInfo; - stringInfo.data = (char*)s.Data(); - stringInfo.len = s.Size(); - stringInfo.maxlen = s.Size(); - stringInfo.cursor = 0; - - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - auto typeIOParam = MakeTypeIOParam(typeInfo); - auto receiveFuncId = typeInfo.ReceiveFuncId; - if (typeInfo.TypeId == typeInfo.ArrayTypeId) { - receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; - } - - FmgrInfo finfo; - Zero(finfo); - Y_ENSURE(receiveFuncId); - fmgr_info(receiveFuncId, &finfo); - Y_ENSURE(!finfo.fn_retset); - Y_ENSURE(finfo.fn_addr); - Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)&stringInfo, false }; - callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; - - auto x = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - Y_ENSURE(stringInfo.cursor == stringInfo.len); - return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + return NYql::NCommon::PgValueFromNativeBinary(s, type->GetTypeId()); } } @@ -2726,7 +2564,7 @@ private: FmgrInfo FInfoEquate; }; -NUdf::IEquate::TPtr MakePgEquate(const NMiniKQL::TPgType* type) { +NUdf::IEquate::TPtr MakePgEquate(const TPgType* type) { return new TPgEquate(type); } diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h index aed53d4faf..a943fbe0b6 100644 --- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h +++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h @@ -15,6 +15,12 @@ namespace NCommon { TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId); NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId); +TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId); +NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId); + +TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId); +NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId); + void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, const TVector<ui32>* structPositions); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 514f02f330..6be057fc6d 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -32,6 +32,30 @@ NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId) { throw yexception() << "PgValueFromString: PG types are not supported"; } +TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { + Y_UNUSED(value); + Y_UNUSED(pgTypeId); + throw yexception() << "PgValueToNativeText: PG types are not supported"; +} + +NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) { + Y_UNUSED(text); + Y_UNUSED(pgTypeId); + throw yexception() << "PgValueFromNativeText: PG types are not supported"; +} + +TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { + Y_UNUSED(value); + Y_UNUSED(pgTypeId); + throw yexception() << "PgValueToNativeBinary: PG types are not supported"; +} + +NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) { + Y_UNUSED(binary); + Y_UNUSED(pgTypeId); + throw yexception() << "PgValueFromNativeBinary: PG types are not supported"; +} + void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, const TVector<ui32>* structPositions) { Y_UNUSED(writer); |