aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-10 14:14:17 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-10 14:14:17 +0300
commit4d9e9a234e8f13e18ed5094c23a672ca73f3a0e5 (patch)
tree434b9940cff03d8cbf35e1e9188042fa81823820
parentbd789f93c8f22764332d81dec16b2a39ab32e274 (diff)
downloadydb-4d9e9a234e8f13e18ed5094c23a672ca73f3a0e5.tar.gz
YQL-13710 table IO via yson, implicit cast from data types to pg in cast,op,call.
ref:ff4cd8968612e675e9fa9ad28d17ca99758a972d
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp69
-rw-r--r--ydb/library/yql/core/yql_pg_utils.h10
-rw-r--r--ydb/library/yql/minikql/mkql_node.cpp3
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.txt2
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp334
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ya.make2
-rw-r--r--ydb/library/yql/providers/common/codec/yql_codec.cpp11
-rw-r--r--ydb/library/yql/providers/common/codec/yql_pg_codec.h5
-rw-r--r--ydb/library/yql/sql/pg_dummy/CMakeLists.txt1
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp29
-rw-r--r--ydb/library/yql/sql/pg_dummy/ya.make1
11 files changed, 416 insertions, 51 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 86044220095..0b5d83497a8 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -12,6 +12,7 @@
#include <ydb/library/yql/core/yql_callable_transform.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
+#include <ydb/library/yql/core/yql_pg_utils.h>
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
@@ -8786,6 +8787,40 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
}
};
+ bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) {
+ pgType = 0;
+ if (type->GetKind() == ETypeAnnotationKind::Null) {
+ return true;
+ }
+
+ if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) {
+ const TTypeAnnotationNode* unpacked = RemoveOptionalType(type);
+ if (unpacked->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ "Nested optional type is not compatible to PG"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto slot = unpacked->Cast<TDataExprType>()->GetSlot();
+ auto convertedTypeId = ConvertToPgType(slot);
+ if (!convertedTypeId) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Type is not compatible to PG: " << slot));
+ return false;
+ }
+
+ pgType = *convertedTypeId;
+ return true;
+ } else if (type->GetKind() != ETypeAnnotationKind::Pg) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Expected PG type, but got: " << type->GetKind()));
+ return false;
+ } else {
+ pgType = type->Cast<TPgExprType>()->GetId();
+ return true;
+ }
+ }
+
IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
bool isResolved = input->Content() == "PgResolvedCall";
if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {
@@ -8808,18 +8843,12 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
TVector<ui32> argTypes;
for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
auto type = input->Child(i)->GetTypeAnn();
- if (type->GetKind() == ETypeAnnotationKind::Null) {
- argTypes.push_back(0);
- continue;
- }
-
- if (type->GetKind() != ETypeAnnotationKind::Pg) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected PG type for argument " << (i - (isResolved ? 2 : 1) + 1) << ", but got: " << type->GetKind() << " for function: " << name));
+ ui32 argType;
+ if (!ExtractPgType(type, argType, input->Child(i)->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- argTypes.push_back(type->Cast<TPgExprType>()->GetId());
+ argTypes.push_back(argType);
}
if (isResolved) {
@@ -8932,18 +8961,12 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
TVector<ui32> argTypes;
for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
auto type = input->Child(i)->GetTypeAnn();
- if (type->GetKind() == ETypeAnnotationKind::Null) {
- argTypes.push_back(0);
- continue;
- }
-
- if (type->GetKind() != ETypeAnnotationKind::Pg) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected PG type for argument " << (i - (isResolved ? 2 : 1) + 1) << ", but got: " << type->GetKind() << " for function: " << name));
+ ui32 argType;
+ if (!ExtractPgType(type, argType, input->Child(i)->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- argTypes.push_back(type->Cast<TPgExprType>()->GetId());
+ argTypes.push_back(argType);
}
if (isResolved) {
@@ -9513,14 +9536,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
auto type = input->Tail().GetTypeAnn();
ui32 inputTypeId = 0;
- if (type->GetKind() != ETypeAnnotationKind::Null) {
- if (type->GetKind() != ETypeAnnotationKind::Pg) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected PG type for cast argument, but got: " << type->GetKind()));
- return IGraphTransformer::TStatus::Error;
- }
-
- inputTypeId = type->Cast<TPgExprType>()->GetId();
+ if (!ExtractPgType(type, inputTypeId, input->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
}
if (inputTypeId != 0 && inputTypeId != targetTypeId) {
diff --git a/ydb/library/yql/core/yql_pg_utils.h b/ydb/library/yql/core/yql_pg_utils.h
new file mode 100644
index 00000000000..5df3aeba510
--- /dev/null
+++ b/ydb/library/yql/core/yql_pg_utils.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <ydb/library/yql/public/udf/udf_data_type.h>
+#include <util/generic/maybe.h>
+
+namespace NYql {
+
+TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot);
+TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId);
+
+}
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp
index f1d854645aa..52a6f561404 100644
--- a/ydb/library/yql/minikql/mkql_node.cpp
+++ b/ydb/library/yql/minikql/mkql_node.cpp
@@ -2287,6 +2287,9 @@ EValueRepresentation GetValueRepresentation(const TType* type) {
case TType::EKind::Null:
return EValueRepresentation::Embedded;
+ case TType::EKind::Pg:
+ return EValueRepresentation::Any;
+
default:
Y_FAIL("Unsupported type.");
}
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
index 87aa37916a7..5b49fcb41b3 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
@@ -93,6 +93,8 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-cpp-resource
yql-minikql-computation
yql-parser-pg_catalog
+ library-yql-core
+ library-cpp-yson
contrib-libs-icu
contrib-libs-libc_compat
Iconv::Iconv
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index b62a76369c2..e502396cd20 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -4,8 +4,11 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h>
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+#include <ydb/library/yql/core/yql_pg_utils.h>
+#include <library/cpp/yson/detail.h>
#define TypeName PG_TypeName
#define SortBy PG_SortBy
@@ -80,8 +83,21 @@ NUdf::TUnboxedValuePod PointerDatumToPod(Datum datum) {
return NUdf::TUnboxedValuePod(std::move(ref));
}
-Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value) {
- return (Datum)(((const char*)value.AsBoxed().Get()) + PallocHdrSize);
+Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value, bool isVar) {
+ if (value.IsBoxed()) {
+ return (Datum)(((const char*)value.AsBoxed().Get()) + PallocHdrSize);
+ }
+
+ // temporary palloc, should be handled by LeakGuard
+ const auto& ref = value.AsStringRef();
+ if (isVar) {
+ return (Datum)cstring_to_text_with_len(ref.Data(), ref.Size());
+ } else {
+ auto ret = (char*)palloc(ref.Size() + 1);
+ memcpy(ret, ref.Data(), ref.Size());
+ ret[ref.Size()] = '\0';
+ return (Datum)ret;
+ }
}
struct TPAllocLeakGuard {
@@ -199,10 +215,7 @@ struct TMkqlPgAdapter {
};
#define SET_MEMORY_CONTEXT \
- CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance(); \
- Y_DEFER { \
- CurrentMemoryContext = ErrorContext = nullptr; \
- };
+ CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance();
class TPgConst : public TMutableComputationNode<TPgConst> {
typedef TMutableComputationNode<TPgConst> TBaseComputation;
@@ -292,9 +305,18 @@ public:
Y_ENSURE(!FInfo.fn_retset);
Y_ENSURE(FInfo.fn_addr);
Y_ENSURE(FInfo.fn_nargs == ArgNodes.size());
+ ArgDesc.reserve(ProcDesc.ArgTypes.size());
+ for (const auto& x : ProcDesc.ArgTypes) {
+ ArgDesc.emplace_back(NPg::LookupType(x));
+ }
+
+ Y_ENSURE(ArgDesc.size() == ArgNodes.size());
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+
auto& state = GetState(compCtx);
auto& callInfo = state.CallInfo.Ref();
callInfo.isnull = false;
@@ -308,14 +330,14 @@ public:
argDatum.isnull = true;
} else {
- argDatum.value = value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value);
+ argDatum.value = ArgDesc[i].PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value, ArgDesc[i].TypeLen == -1);
}
callInfo.args[i] = argDatum;
}
- SET_MEMORY_CONTEXT;
- TPAllocLeakGuard leakGuard;
PG_TRY();
{
auto ret = FInfo.fn_addr(&callInfo);
@@ -370,6 +392,7 @@ private:
const NPg::TProcDesc ProcDesc;
const NPg::TTypeDesc RetTypeDesc;
const TComputationNodePtrVector ArgNodes;
+ TVector<NPg::TTypeDesc> ArgDesc;
};
inline ui32 MakeTypeIOParam(const NPg::TTypeDesc& desc) {
@@ -385,6 +408,7 @@ public:
, SourceId(sourceId)
, TargetId(targetId)
, Arg(arg)
+ , SourceTypeDesc(NPg::LookupType(SourceId))
, TargetTypeDesc(NPg::LookupType(targetId))
{
TypeIOParam = MakeTypeIOParam(TargetTypeDesc);
@@ -395,15 +419,14 @@ public:
return;
}
- const auto& sourceTypeDesc = NPg::LookupType(SourceId);
ui32 funcId;
ui32 funcId2 = 0;
if (!NPg::HasCast(SourceId, TargetId)) {
- if (sourceTypeDesc.Category == 'S') {
+ if (SourceTypeDesc.Category == 'S') {
funcId = TargetTypeDesc.InFuncId;
} else {
Y_ENSURE(TargetTypeDesc.Category == 'S');
- funcId = sourceTypeDesc.OutFuncId;
+ funcId = SourceTypeDesc.OutFuncId;
}
} else {
const auto& cast = NPg::LookupCast(SourceId, TargetId);
@@ -416,7 +439,7 @@ public:
break;
}
case NPg::ECastMethod::InOut: {
- funcId = sourceTypeDesc.OutFuncId;
+ funcId = SourceTypeDesc.OutFuncId;
funcId2 = TargetTypeDesc.InFuncId;
break;
}
@@ -429,7 +452,7 @@ public:
Y_ENSURE(FInfo1.fn_nargs >= 1 && FInfo1.fn_nargs <= 3);
Func1Lookup = NPg::LookupProc(funcId);
Y_ENSURE(Func1Lookup.ArgTypes.size() >= 1 && Func1Lookup.ArgTypes.size() <= 3);
- if (Func1Lookup.ArgTypes[0] == CSTRINGOID && sourceTypeDesc.Category == 'S') {
+ if (Func1Lookup.ArgTypes[0] == CSTRINGOID && SourceTypeDesc.Category == 'S') {
ConvertArgToCString = true;
}
@@ -464,12 +487,14 @@ public:
return value.Release();
}
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
auto& state = GetState(compCtx);
auto& callInfo1 = state.CallInfo1.Ref();
callInfo1.isnull = false;
- NullableDatum argDatum = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false };
- SET_MEMORY_CONTEXT;
- TPAllocLeakGuard leakGuard;
+ NullableDatum argDatum = { SourceTypeDesc.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value, SourceTypeDesc.TypeLen == -1), false };
if (ConvertArgToCString) {
argDatum.value = (Datum)text_to_cstring((const text*)argDatum.value);
Y_DEFER {
@@ -568,6 +593,7 @@ private:
const ui32 SourceId;
const ui32 TargetId;
IComputationNode* const Arg;
+ const NPg::TTypeDesc SourceTypeDesc;
const NPg::TTypeDesc TargetTypeDesc;
FmgrInfo FInfo1, FInfo2;
NPg::TProcDesc Func1Lookup, Func2Lookup;
@@ -607,7 +633,12 @@ TComputationNodeFactory GetPgFactory() {
auto inputType = callable.GetInput(0).GetStaticType();
ui32 sourceId = 0;
if (!inputType->IsNull()) {
- sourceId = AS_TYPE(TPgType, inputType)->GetTypeId();
+ if (inputType->IsData() || inputType->IsOptional()) {
+ bool isOptional;
+ sourceId = *ConvertToPgType(*UnpackOptionalData(inputType, isOptional)->GetDataSlot());
+ } else {
+ sourceId = AS_TYPE(TPgType, inputType)->GetTypeId();
+ }
}
auto returnType = callable.GetType()->GetReturnType();
@@ -621,6 +652,108 @@ TComputationNodeFactory GetPgFactory() {
namespace NCommon {
+void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::TUnboxedValuePod& value) {
+ using namespace NYson::NDetail;
+ if (!value) {
+ buf.Write(EntitySymbol);
+ return;
+ }
+
+ switch (type->GetTypeId()) {
+ case BOOLOID:
+ buf.Write(DatumGetBool(ScalarDatumFromPod(value)) ? TrueMarker : FalseMarker);
+ break;
+ case INT2OID:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(DatumGetInt16(ScalarDatumFromPod(value)));
+ break;
+ case INT4OID:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(DatumGetInt32(ScalarDatumFromPod(value)));
+ break;
+ case INT8OID:
+ buf.Write(Int64Marker);
+ buf.WriteVarI64(DatumGetInt64(ScalarDatumFromPod(value)));
+ break;
+ case FLOAT4OID: {
+ buf.Write(DoubleMarker);
+ double val = DatumGetFloat4(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&val, sizeof(val));
+ break;
+ }
+ case FLOAT8OID: {
+ buf.Write(DoubleMarker);
+ double val = DatumGetFloat8(ScalarDatumFromPod(value));
+ buf.WriteMany((const char*)&val, sizeof(val));
+ break;
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const text*)PointerDatumFromPod(value, true);
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ TString s;
+ if (len) {
+ s = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, s.begin(), len + 1);
+ }
+
+ buf.Write(StringMarker);
+ buf.WriteVarI32(len);
+ buf.WriteMany(s);
+ break;
+ }
+ case CSTRINGOID: {
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ auto s = (const char*)PointerDatumFromPod(value, false);
+ auto len = strlen(s);
+ buf.Write(StringMarker);
+ buf.WriteVarI32(len);
+ buf.WriteMany(s, len);
+ break;
+ }
+ default:
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ Y_ENSURE(typeInfo.SendFuncId);
+ FmgrInfo finfo;
+ Zero(finfo);
+ fmgr_info(typeInfo.SendFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs == 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->isnull = false;
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value):
+ PointerDatumFromPod(value, typeInfo.TypeLen == -1), false };
+ auto x = (text*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_DEFER {
+ pfree(x);
+ };
+
+ ui32 len = VARSIZE_ANY_EXHDR(x);
+ TString s;
+ if (len) {
+ s = TString::Uninitialized(len);
+ text_to_cstring_buffer(x, s.begin(), len + 1);
+ }
+
+ buf.Write(StringMarker);
+ buf.WriteVarI32(len);
+ buf.WriteMany(s);
+ break;
+ }
+}
+
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
const TVector<ui32>* structPositions) {
if (!value) {
@@ -651,7 +784,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
case BYTEAOID:
case VARCHAROID:
case TEXTOID: {
- const auto x = (const text*)PointerDatumFromPod(value);
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const text*)PointerDatumFromPod(value, true);
ui32 len = VARSIZE_ANY_EXHDR(x);
if (len) {
ret = TString::Uninitialized(len);
@@ -660,7 +795,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
break;
}
case CSTRINGOID: {
- auto str = (const char*)PointerDatumFromPod(value);
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ auto str = (const char*)PointerDatumFromPod(value, false);
ret = str;
break;
}
@@ -679,7 +816,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
callInfo->flinfo = &finfo;
callInfo->nargs = 1;
callInfo->isnull = false;
- callInfo->args[0] = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false };
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value):
+ PointerDatumFromPod(value, typeInfo.TypeLen == -1), false };
auto str = (char*)finfo.fn_addr(callInfo);
Y_ENSURE(!callInfo->isnull);
Y_DEFER {
@@ -692,7 +831,145 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
writer.OnStringScalar(ret);
}
+NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
+ using namespace NYson::NDetail;
+ if (cmd == EntitySymbol) {
+ return NUdf::TUnboxedValuePod();
+ }
+
+ switch (type->GetTypeId()) {
+ case BOOLOID: {
+ YQL_ENSURE(cmd == FalseMarker || cmd == TrueMarker, "Expected either true or false, but got: " << TString(cmd).Quote());
+ return ScalarDatumToPod(BoolGetDatum(cmd == TrueMarker));
+ }
+ case INT2OID: {
+ CHECK_EXPECTED(cmd, Int64Marker);
+ auto x = i16(buf.ReadVarI64());
+ return ScalarDatumToPod(Int16GetDatum(x));
+ }
+ case INT4OID: {
+ CHECK_EXPECTED(cmd, Int64Marker);
+ auto x = i32(buf.ReadVarI64());
+ return ScalarDatumToPod(Int32GetDatum(x));
+ }
+ case INT8OID: {
+ CHECK_EXPECTED(cmd, Int64Marker);
+ auto x = buf.ReadVarI64();
+ return ScalarDatumToPod(Int64GetDatum(x));
+ }
+ case FLOAT4OID: {
+ CHECK_EXPECTED(cmd, DoubleMarker);
+ double x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Float4GetDatum(x));
+ }
+ case FLOAT8OID: {
+ CHECK_EXPECTED(cmd, DoubleMarker);
+ double x;
+ buf.ReadMany((char*)&x, sizeof(x));
+ return ScalarDatumToPod(Float8GetDatum(x));
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID: {
+ CHECK_EXPECTED(cmd, StringMarker);
+ auto s = buf.ReadYtString();
+ SET_MEMORY_CONTEXT;
+ auto ret = cstring_to_text_with_len(s.Data(), s.Size());
+ return PointerDatumToPod((Datum)ret);
+ }
+ case CSTRINGOID: {
+ CHECK_EXPECTED(cmd, StringMarker);
+ auto s = buf.ReadYtString();
+ SET_MEMORY_CONTEXT;
+ auto ret = (char*)palloc(s.Size() + 1);
+ memcpy(ret, s.Data(), s.Size());
+ ret[s.Size()] = '\0';
+ return PointerDatumToPod((Datum)ret);
+ }
+ default:
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ auto s = buf.ReadYtString();
+ StringInfoData stringInfo;
+ stringInfo.data = (char*)s.Data();
+ stringInfo.len = s.Size();
+ stringInfo.maxlen = s.Size();
+ stringInfo.cursor = 0;
+
+ const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ auto typeIOParam = MakeTypeIOParam(typeInfo);
+ Y_ENSURE(typeInfo.ReceiveFuncId);
+ FmgrInfo finfo;
+ Zero(finfo);
+ fmgr_info(typeInfo.ReceiveFuncId, &finfo);
+ Y_ENSURE(!finfo.fn_retset);
+ Y_ENSURE(finfo.fn_addr);
+ Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3);
+ LOCAL_FCINFO(callInfo, 3);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 3;
+ callInfo->isnull = false;
+ callInfo->args[0] = { (Datum)&stringInfo, false };
+ callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto x = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ Y_ENSURE(stringInfo.cursor == stringInfo.len);
+ return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x);
+ }
+}
+
} // namespace NCommon
+
+TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) {
+ switch (slot) {
+ case NUdf::EDataSlot::Bool:
+ return BOOLOID;
+ case NUdf::EDataSlot::Int16:
+ return INT2OID;
+ case NUdf::EDataSlot::Int32:
+ return INT4OID;
+ case NUdf::EDataSlot::Int64:
+ return INT8OID;
+ case NUdf::EDataSlot::Float:
+ return FLOAT4OID;
+ case NUdf::EDataSlot::Double:
+ return FLOAT8OID;
+ case NUdf::EDataSlot::String:
+ return BYTEAOID;
+ case NUdf::EDataSlot::Utf8:
+ return TEXTOID;
+ default:
+ return Nothing();
+ }
+}
+
+TMaybe<NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) {
+ switch (typeId) {
+ case BOOLOID:
+ return NUdf::EDataSlot::Bool;
+ case INT2OID:
+ return NUdf::EDataSlot::Int16;
+ case INT4OID:
+ return NUdf::EDataSlot::Int32;
+ case INT8OID:
+ return NUdf::EDataSlot::Int64;
+ case FLOAT4OID:
+ return NUdf::EDataSlot::Float;
+ case FLOAT8OID:
+ return NUdf::EDataSlot::Double;
+ case BYTEAOID:
+ return NUdf::EDataSlot::String;
+ case TEXTOID:
+ return NUdf::EDataSlot::Utf8;
+ }
+
+ return Nothing();
+}
+
} // NYql
namespace NKikimr {
@@ -735,7 +1012,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe
case BYTEAOID:
case VARCHAROID:
case TEXTOID: {
- const auto x = (const text*)PointerDatumFromPod(value);
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const text*)PointerDatumFromPod(value, true);
ui32 len = VARSIZE_ANY_EXHDR(x);
NDetails::PackUInt32(len, buf);
auto off = buf.Size();
@@ -745,7 +1024,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe
break;
}
case CSTRINGOID: {
- const auto x = (const char*)PointerDatumFromPod(value);
+ SET_MEMORY_CONTEXT;
+ TPAllocLeakGuard leakGuard;
+ const auto x = (const char*)PointerDatumFromPod(value, false);
const auto len = strlen(x);
NDetails::PackUInt32(len, buf);
buf.Append(x, len);
@@ -767,7 +1048,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe
callInfo->flinfo = &finfo;
callInfo->nargs = 1;
callInfo->isnull = false;
- callInfo->args[0] = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false };
+ callInfo->args[0] = { typeInfo.PassByValue ?
+ ScalarDatumFromPod(value) :
+ PointerDatumFromPod(value, typeInfo.TypeLen == -1), false };
auto x = (text*)finfo.fn_addr(callInfo);
Y_ENSURE(!callInfo->isnull);
Y_DEFER{
@@ -826,7 +1109,7 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small");
const char* ptr = buf.data();
buf.Skip(size);
- char* ret = (char*)palloc(size + 1);
+ auto ret = (char*)palloc(size + 1);
memcpy(ret, ptr, size);
ret[size] = '\0';
return PointerDatumToPod((Datum)ret);
@@ -869,3 +1152,4 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
} // namespace NMiniKQL
} // namespace NKikimr
+
diff --git a/ydb/library/yql/parser/pg_wrapper/ya.make b/ydb/library/yql/parser/pg_wrapper/ya.make
index adc1c9ceffd..c68519541ac 100644
--- a/ydb/library/yql/parser/pg_wrapper/ya.make
+++ b/ydb/library/yql/parser/pg_wrapper/ya.make
@@ -32,6 +32,8 @@ PEERDIR(
library/cpp/resource
ydb/library/yql/minikql/computation
ydb/library/yql/parser/pg_catalog
+ ydb/library/yql/core
+ library/cpp/yson
contrib/libs/icu
contrib/libs/libc_compat
diff --git a/ydb/library/yql/providers/common/codec/yql_codec.cpp b/ydb/library/yql/providers/common/codec/yql_codec.cpp
index 74685167014..28f8ce8c926 100644
--- a/ydb/library/yql/providers/common/codec/yql_codec.cpp
+++ b/ydb/library/yql/providers/common/codec/yql_codec.cpp
@@ -1237,6 +1237,11 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type,
return holderFactory.GetEmptyContainer();
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ return ReadYsonValuePg(pgType, cmd, buf);
+ }
+
default:
YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
}
@@ -1978,6 +1983,12 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, const NUdf::TUnbo
break;
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ WriteYsonValueInTableFormatPg(buf, pgType, value);
+ break;
+ }
+
default:
YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr());
}
diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
index 969bfb9ca52..963d5231e14 100644
--- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h
+++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
@@ -7,6 +7,7 @@
#include <util/generic/vector.h>
#include "yql_codec_results.h"
+#include "yql_codec_buf.h"
namespace NYql {
namespace NCommon {
@@ -14,5 +15,9 @@ namespace NCommon {
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
const TVector<ui32>* structPositions);
+void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value);
+
+NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf);
+
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/sql/pg_dummy/CMakeLists.txt b/ydb/library/yql/sql/pg_dummy/CMakeLists.txt
index 3dd03ca694b..ed7117d789f 100644
--- a/ydb/library/yql/sql/pg_dummy/CMakeLists.txt
+++ b/ydb/library/yql/sql/pg_dummy/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(yql-sql-pg_dummy PUBLIC
yql-sql-settings
providers-common-codec
yql-minikql-computation
+ library-yql-core
)
target_sources(yql-sql-pg_dummy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 5e81f52740a..7af490dac9c 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -1,6 +1,7 @@
#include <ydb/library/yql/sql/pg_sql.h>
#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h>
+#include <ydb/library/yql/core/yql_pg_utils.h>
namespace NSQLTranslationPG {
@@ -26,6 +27,20 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
throw yexception() << "PG types are not supported";
}
+void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value) {
+ Y_UNUSED(buf);
+ Y_UNUSED(type);
+ Y_UNUSED(value);
+ throw yexception() << "PG types are not supported";
+}
+
+NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) {
+ Y_UNUSED(type);
+ Y_UNUSED(cmd);
+ Y_UNUSED(buf);
+ throw yexception() << "PG types are not supported";
+}
+
} // namespace NCommon
} // NYql
@@ -47,3 +62,17 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
} // namespace NMiniKQL
} // namespace NKikimr
+
+namespace NYql {
+
+TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot) {
+ Y_UNUSED(slot);
+ return Nothing();
+}
+
+TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) {
+ Y_UNUSED(typeId);
+ return Nothing();
+}
+
+} // NYql
diff --git a/ydb/library/yql/sql/pg_dummy/ya.make b/ydb/library/yql/sql/pg_dummy/ya.make
index 24afaaa240c..eed8c765639 100644
--- a/ydb/library/yql/sql/pg_dummy/ya.make
+++ b/ydb/library/yql/sql/pg_dummy/ya.make
@@ -7,6 +7,7 @@ PEERDIR(
ydb/library/yql/sql/settings
ydb/library/yql/providers/common/codec
ydb/library/yql/minikql/computation
+ ydb/library/yql/core
)
SRCS(