diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-10 14:14:17 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-10 14:14:17 +0300 |
commit | 4d9e9a234e8f13e18ed5094c23a672ca73f3a0e5 (patch) | |
tree | 434b9940cff03d8cbf35e1e9188042fa81823820 | |
parent | bd789f93c8f22764332d81dec16b2a39ab32e274 (diff) | |
download | ydb-4d9e9a234e8f13e18ed5094c23a672ca73f3a0e5.tar.gz |
YQL-13710 table IO via yson, implicit cast from data types to pg in cast,op,call.
ref:ff4cd8968612e675e9fa9ad28d17ca99758a972d
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 69 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_pg_utils.h | 10 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_node.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 334 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/ya.make | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/codec/yql_codec.cpp | 11 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/codec/yql_pg_codec.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/ya.make | 1 |
11 files changed, 416 insertions, 51 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 86044220095..0b5d83497a8 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -12,6 +12,7 @@ #include <ydb/library/yql/core/yql_callable_transform.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/core/yql_type_helpers.h> +#include <ydb/library/yql/core/yql_pg_utils.h> #include <ydb/library/yql/core/issue/protos/issue_id.pb.h> #include <ydb/library/yql/core/issue/yql_issue.h> #include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h> @@ -8786,6 +8787,40 @@ template <NKikimr::NUdf::EDataSlot DataSlot> } }; + bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) { + pgType = 0; + if (type->GetKind() == ETypeAnnotationKind::Null) { + return true; + } + + if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) { + const TTypeAnnotationNode* unpacked = RemoveOptionalType(type); + if (unpacked->GetKind() != ETypeAnnotationKind::Data) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + "Nested optional type is not compatible to PG")); + return IGraphTransformer::TStatus::Error; + } + + auto slot = unpacked->Cast<TDataExprType>()->GetSlot(); + auto convertedTypeId = ConvertToPgType(slot); + if (!convertedTypeId) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Type is not compatible to PG: " << slot)); + return false; + } + + pgType = *convertedTypeId; + return true; + } else if (type->GetKind() != ETypeAnnotationKind::Pg) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Expected PG type, but got: " << type->GetKind())); + return false; + } else { + pgType = type->Cast<TPgExprType>()->GetId(); + return true; + } + } + IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { bool isResolved = input->Content() == "PgResolvedCall"; if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) { @@ -8808,18 +8843,12 @@ template <NKikimr::NUdf::EDataSlot DataSlot> TVector<ui32> argTypes; for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) { auto type = input->Child(i)->GetTypeAnn(); - if (type->GetKind() == ETypeAnnotationKind::Null) { - argTypes.push_back(0); - continue; - } - - if (type->GetKind() != ETypeAnnotationKind::Pg) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), - TStringBuilder() << "Expected PG type for argument " << (i - (isResolved ? 2 : 1) + 1) << ", but got: " << type->GetKind() << " for function: " << name)); + ui32 argType; + if (!ExtractPgType(type, argType, input->Child(i)->Pos(), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - argTypes.push_back(type->Cast<TPgExprType>()->GetId()); + argTypes.push_back(argType); } if (isResolved) { @@ -8932,18 +8961,12 @@ template <NKikimr::NUdf::EDataSlot DataSlot> TVector<ui32> argTypes; for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) { auto type = input->Child(i)->GetTypeAnn(); - if (type->GetKind() == ETypeAnnotationKind::Null) { - argTypes.push_back(0); - continue; - } - - if (type->GetKind() != ETypeAnnotationKind::Pg) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), - TStringBuilder() << "Expected PG type for argument " << (i - (isResolved ? 2 : 1) + 1) << ", but got: " << type->GetKind() << " for function: " << name)); + ui32 argType; + if (!ExtractPgType(type, argType, input->Child(i)->Pos(), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - argTypes.push_back(type->Cast<TPgExprType>()->GetId()); + argTypes.push_back(argType); } if (isResolved) { @@ -9513,14 +9536,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot> auto type = input->Tail().GetTypeAnn(); ui32 inputTypeId = 0; - if (type->GetKind() != ETypeAnnotationKind::Null) { - if (type->GetKind() != ETypeAnnotationKind::Pg) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), - TStringBuilder() << "Expected PG type for cast argument, but got: " << type->GetKind())); - return IGraphTransformer::TStatus::Error; - } - - inputTypeId = type->Cast<TPgExprType>()->GetId(); + if (!ExtractPgType(type, inputTypeId, input->Pos(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; } if (inputTypeId != 0 && inputTypeId != targetTypeId) { diff --git a/ydb/library/yql/core/yql_pg_utils.h b/ydb/library/yql/core/yql_pg_utils.h new file mode 100644 index 00000000000..5df3aeba510 --- /dev/null +++ b/ydb/library/yql/core/yql_pg_utils.h @@ -0,0 +1,10 @@ +#pragma once +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <util/generic/maybe.h> + +namespace NYql { + +TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot); +TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId); + +} diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index f1d854645aa..52a6f561404 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -2287,6 +2287,9 @@ EValueRepresentation GetValueRepresentation(const TType* type) { case TType::EKind::Null: return EValueRepresentation::Embedded; + case TType::EKind::Pg: + return EValueRepresentation::Any; + default: Y_FAIL("Unsupported type."); } diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt index 87aa37916a7..5b49fcb41b3 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt @@ -93,6 +93,8 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-cpp-resource yql-minikql-computation yql-parser-pg_catalog + library-yql-core + library-cpp-yson contrib-libs-icu contrib-libs-libc_compat Iconv::Iconv diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index b62a76369c2..e502396cd20 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -4,8 +4,11 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/providers/common/codec/yql_pg_codec.h> #include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/library/yql/core/yql_pg_utils.h> +#include <library/cpp/yson/detail.h> #define TypeName PG_TypeName #define SortBy PG_SortBy @@ -80,8 +83,21 @@ NUdf::TUnboxedValuePod PointerDatumToPod(Datum datum) { return NUdf::TUnboxedValuePod(std::move(ref)); } -Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value) { - return (Datum)(((const char*)value.AsBoxed().Get()) + PallocHdrSize); +Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value, bool isVar) { + if (value.IsBoxed()) { + return (Datum)(((const char*)value.AsBoxed().Get()) + PallocHdrSize); + } + + // temporary palloc, should be handled by LeakGuard + const auto& ref = value.AsStringRef(); + if (isVar) { + return (Datum)cstring_to_text_with_len(ref.Data(), ref.Size()); + } else { + auto ret = (char*)palloc(ref.Size() + 1); + memcpy(ret, ref.Data(), ref.Size()); + ret[ref.Size()] = '\0'; + return (Datum)ret; + } } struct TPAllocLeakGuard { @@ -199,10 +215,7 @@ struct TMkqlPgAdapter { }; #define SET_MEMORY_CONTEXT \ - CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance(); \ - Y_DEFER { \ - CurrentMemoryContext = ErrorContext = nullptr; \ - }; + CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance(); class TPgConst : public TMutableComputationNode<TPgConst> { typedef TMutableComputationNode<TPgConst> TBaseComputation; @@ -292,9 +305,18 @@ public: Y_ENSURE(!FInfo.fn_retset); Y_ENSURE(FInfo.fn_addr); Y_ENSURE(FInfo.fn_nargs == ArgNodes.size()); + ArgDesc.reserve(ProcDesc.ArgTypes.size()); + for (const auto& x : ProcDesc.ArgTypes) { + ArgDesc.emplace_back(NPg::LookupType(x)); + } + + Y_ENSURE(ArgDesc.size() == ArgNodes.size()); } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + auto& state = GetState(compCtx); auto& callInfo = state.CallInfo.Ref(); callInfo.isnull = false; @@ -308,14 +330,14 @@ public: argDatum.isnull = true; } else { - argDatum.value = value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value); + argDatum.value = ArgDesc[i].PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value, ArgDesc[i].TypeLen == -1); } callInfo.args[i] = argDatum; } - SET_MEMORY_CONTEXT; - TPAllocLeakGuard leakGuard; PG_TRY(); { auto ret = FInfo.fn_addr(&callInfo); @@ -370,6 +392,7 @@ private: const NPg::TProcDesc ProcDesc; const NPg::TTypeDesc RetTypeDesc; const TComputationNodePtrVector ArgNodes; + TVector<NPg::TTypeDesc> ArgDesc; }; inline ui32 MakeTypeIOParam(const NPg::TTypeDesc& desc) { @@ -385,6 +408,7 @@ public: , SourceId(sourceId) , TargetId(targetId) , Arg(arg) + , SourceTypeDesc(NPg::LookupType(SourceId)) , TargetTypeDesc(NPg::LookupType(targetId)) { TypeIOParam = MakeTypeIOParam(TargetTypeDesc); @@ -395,15 +419,14 @@ public: return; } - const auto& sourceTypeDesc = NPg::LookupType(SourceId); ui32 funcId; ui32 funcId2 = 0; if (!NPg::HasCast(SourceId, TargetId)) { - if (sourceTypeDesc.Category == 'S') { + if (SourceTypeDesc.Category == 'S') { funcId = TargetTypeDesc.InFuncId; } else { Y_ENSURE(TargetTypeDesc.Category == 'S'); - funcId = sourceTypeDesc.OutFuncId; + funcId = SourceTypeDesc.OutFuncId; } } else { const auto& cast = NPg::LookupCast(SourceId, TargetId); @@ -416,7 +439,7 @@ public: break; } case NPg::ECastMethod::InOut: { - funcId = sourceTypeDesc.OutFuncId; + funcId = SourceTypeDesc.OutFuncId; funcId2 = TargetTypeDesc.InFuncId; break; } @@ -429,7 +452,7 @@ public: Y_ENSURE(FInfo1.fn_nargs >= 1 && FInfo1.fn_nargs <= 3); Func1Lookup = NPg::LookupProc(funcId); Y_ENSURE(Func1Lookup.ArgTypes.size() >= 1 && Func1Lookup.ArgTypes.size() <= 3); - if (Func1Lookup.ArgTypes[0] == CSTRINGOID && sourceTypeDesc.Category == 'S') { + if (Func1Lookup.ArgTypes[0] == CSTRINGOID && SourceTypeDesc.Category == 'S') { ConvertArgToCString = true; } @@ -464,12 +487,14 @@ public: return value.Release(); } + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; auto& state = GetState(compCtx); auto& callInfo1 = state.CallInfo1.Ref(); callInfo1.isnull = false; - NullableDatum argDatum = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false }; - SET_MEMORY_CONTEXT; - TPAllocLeakGuard leakGuard; + NullableDatum argDatum = { SourceTypeDesc.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value, SourceTypeDesc.TypeLen == -1), false }; if (ConvertArgToCString) { argDatum.value = (Datum)text_to_cstring((const text*)argDatum.value); Y_DEFER { @@ -568,6 +593,7 @@ private: const ui32 SourceId; const ui32 TargetId; IComputationNode* const Arg; + const NPg::TTypeDesc SourceTypeDesc; const NPg::TTypeDesc TargetTypeDesc; FmgrInfo FInfo1, FInfo2; NPg::TProcDesc Func1Lookup, Func2Lookup; @@ -607,7 +633,12 @@ TComputationNodeFactory GetPgFactory() { auto inputType = callable.GetInput(0).GetStaticType(); ui32 sourceId = 0; if (!inputType->IsNull()) { - sourceId = AS_TYPE(TPgType, inputType)->GetTypeId(); + if (inputType->IsData() || inputType->IsOptional()) { + bool isOptional; + sourceId = *ConvertToPgType(*UnpackOptionalData(inputType, isOptional)->GetDataSlot()); + } else { + sourceId = AS_TYPE(TPgType, inputType)->GetTypeId(); + } } auto returnType = callable.GetType()->GetReturnType(); @@ -621,6 +652,108 @@ TComputationNodeFactory GetPgFactory() { namespace NCommon { +void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::TUnboxedValuePod& value) { + using namespace NYson::NDetail; + if (!value) { + buf.Write(EntitySymbol); + return; + } + + switch (type->GetTypeId()) { + case BOOLOID: + buf.Write(DatumGetBool(ScalarDatumFromPod(value)) ? TrueMarker : FalseMarker); + break; + case INT2OID: + buf.Write(Int64Marker); + buf.WriteVarI64(DatumGetInt16(ScalarDatumFromPod(value))); + break; + case INT4OID: + buf.Write(Int64Marker); + buf.WriteVarI64(DatumGetInt32(ScalarDatumFromPod(value))); + break; + case INT8OID: + buf.Write(Int64Marker); + buf.WriteVarI64(DatumGetInt64(ScalarDatumFromPod(value))); + break; + case FLOAT4OID: { + buf.Write(DoubleMarker); + double val = DatumGetFloat4(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&val, sizeof(val)); + break; + } + case FLOAT8OID: { + buf.Write(DoubleMarker); + double val = DatumGetFloat8(ScalarDatumFromPod(value)); + buf.WriteMany((const char*)&val, sizeof(val)); + break; + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const text*)PointerDatumFromPod(value, true); + ui32 len = VARSIZE_ANY_EXHDR(x); + TString s; + if (len) { + s = TString::Uninitialized(len); + text_to_cstring_buffer(x, s.begin(), len + 1); + } + + buf.Write(StringMarker); + buf.WriteVarI32(len); + buf.WriteMany(s); + break; + } + case CSTRINGOID: { + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + auto s = (const char*)PointerDatumFromPod(value, false); + auto len = strlen(s); + buf.Write(StringMarker); + buf.WriteVarI32(len); + buf.WriteMany(s, len); + break; + } + default: + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + Y_ENSURE(typeInfo.SendFuncId); + FmgrInfo finfo; + Zero(finfo); + fmgr_info(typeInfo.SendFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs == 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->isnull = false; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value): + PointerDatumFromPod(value, typeInfo.TypeLen == -1), false }; + auto x = (text*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_DEFER { + pfree(x); + }; + + ui32 len = VARSIZE_ANY_EXHDR(x); + TString s; + if (len) { + s = TString::Uninitialized(len); + text_to_cstring_buffer(x, s.begin(), len + 1); + } + + buf.Write(StringMarker); + buf.WriteVarI32(len); + buf.WriteMany(s); + break; + } +} + void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, const TVector<ui32>* structPositions) { if (!value) { @@ -651,7 +784,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v case BYTEAOID: case VARCHAROID: case TEXTOID: { - const auto x = (const text*)PointerDatumFromPod(value); + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const text*)PointerDatumFromPod(value, true); ui32 len = VARSIZE_ANY_EXHDR(x); if (len) { ret = TString::Uninitialized(len); @@ -660,7 +795,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v break; } case CSTRINGOID: { - auto str = (const char*)PointerDatumFromPod(value); + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + auto str = (const char*)PointerDatumFromPod(value, false); ret = str; break; } @@ -679,7 +816,9 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v callInfo->flinfo = &finfo; callInfo->nargs = 1; callInfo->isnull = false; - callInfo->args[0] = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false }; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value): + PointerDatumFromPod(value, typeInfo.TypeLen == -1), false }; auto str = (char*)finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); Y_DEFER { @@ -692,7 +831,145 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v writer.OnStringScalar(ret); } +NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { + using namespace NYson::NDetail; + if (cmd == EntitySymbol) { + return NUdf::TUnboxedValuePod(); + } + + switch (type->GetTypeId()) { + case BOOLOID: { + YQL_ENSURE(cmd == FalseMarker || cmd == TrueMarker, "Expected either true or false, but got: " << TString(cmd).Quote()); + return ScalarDatumToPod(BoolGetDatum(cmd == TrueMarker)); + } + case INT2OID: { + CHECK_EXPECTED(cmd, Int64Marker); + auto x = i16(buf.ReadVarI64()); + return ScalarDatumToPod(Int16GetDatum(x)); + } + case INT4OID: { + CHECK_EXPECTED(cmd, Int64Marker); + auto x = i32(buf.ReadVarI64()); + return ScalarDatumToPod(Int32GetDatum(x)); + } + case INT8OID: { + CHECK_EXPECTED(cmd, Int64Marker); + auto x = buf.ReadVarI64(); + return ScalarDatumToPod(Int64GetDatum(x)); + } + case FLOAT4OID: { + CHECK_EXPECTED(cmd, DoubleMarker); + double x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Float4GetDatum(x)); + } + case FLOAT8OID: { + CHECK_EXPECTED(cmd, DoubleMarker); + double x; + buf.ReadMany((char*)&x, sizeof(x)); + return ScalarDatumToPod(Float8GetDatum(x)); + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: { + CHECK_EXPECTED(cmd, StringMarker); + auto s = buf.ReadYtString(); + SET_MEMORY_CONTEXT; + auto ret = cstring_to_text_with_len(s.Data(), s.Size()); + return PointerDatumToPod((Datum)ret); + } + case CSTRINGOID: { + CHECK_EXPECTED(cmd, StringMarker); + auto s = buf.ReadYtString(); + SET_MEMORY_CONTEXT; + auto ret = (char*)palloc(s.Size() + 1); + memcpy(ret, s.Data(), s.Size()); + ret[s.Size()] = '\0'; + return PointerDatumToPod((Datum)ret); + } + default: + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + auto s = buf.ReadYtString(); + StringInfoData stringInfo; + stringInfo.data = (char*)s.Data(); + stringInfo.len = s.Size(); + stringInfo.maxlen = s.Size(); + stringInfo.cursor = 0; + + const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + auto typeIOParam = MakeTypeIOParam(typeInfo); + Y_ENSURE(typeInfo.ReceiveFuncId); + FmgrInfo finfo; + Zero(finfo); + fmgr_info(typeInfo.ReceiveFuncId, &finfo); + Y_ENSURE(!finfo.fn_retset); + Y_ENSURE(finfo.fn_addr); + Y_ENSURE(finfo.fn_nargs >= 1 && finfo.fn_nargs <= 3); + LOCAL_FCINFO(callInfo, 3); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 3; + callInfo->isnull = false; + callInfo->args[0] = { (Datum)&stringInfo, false }; + callInfo->args[1] = { ObjectIdGetDatum(typeIOParam), false }; + callInfo->args[2] = { Int32GetDatum(-1), false }; + + auto x = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + Y_ENSURE(stringInfo.cursor == stringInfo.len); + return typeInfo.PassByValue ? ScalarDatumToPod(x) : PointerDatumToPod(x); + } +} + } // namespace NCommon + +TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) { + switch (slot) { + case NUdf::EDataSlot::Bool: + return BOOLOID; + case NUdf::EDataSlot::Int16: + return INT2OID; + case NUdf::EDataSlot::Int32: + return INT4OID; + case NUdf::EDataSlot::Int64: + return INT8OID; + case NUdf::EDataSlot::Float: + return FLOAT4OID; + case NUdf::EDataSlot::Double: + return FLOAT8OID; + case NUdf::EDataSlot::String: + return BYTEAOID; + case NUdf::EDataSlot::Utf8: + return TEXTOID; + default: + return Nothing(); + } +} + +TMaybe<NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) { + switch (typeId) { + case BOOLOID: + return NUdf::EDataSlot::Bool; + case INT2OID: + return NUdf::EDataSlot::Int16; + case INT4OID: + return NUdf::EDataSlot::Int32; + case INT8OID: + return NUdf::EDataSlot::Int64; + case FLOAT4OID: + return NUdf::EDataSlot::Float; + case FLOAT8OID: + return NUdf::EDataSlot::Double; + case BYTEAOID: + return NUdf::EDataSlot::String; + case TEXTOID: + return NUdf::EDataSlot::Utf8; + } + + return Nothing(); +} + } // NYql namespace NKikimr { @@ -735,7 +1012,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe case BYTEAOID: case VARCHAROID: case TEXTOID: { - const auto x = (const text*)PointerDatumFromPod(value); + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const text*)PointerDatumFromPod(value, true); ui32 len = VARSIZE_ANY_EXHDR(x); NDetails::PackUInt32(len, buf); auto off = buf.Size(); @@ -745,7 +1024,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe break; } case CSTRINGOID: { - const auto x = (const char*)PointerDatumFromPod(value); + SET_MEMORY_CONTEXT; + TPAllocLeakGuard leakGuard; + const auto x = (const char*)PointerDatumFromPod(value, false); const auto len = strlen(x); NDetails::PackUInt32(len, buf); buf.Append(x, len); @@ -767,7 +1048,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe callInfo->flinfo = &finfo; callInfo->nargs = 1; callInfo->isnull = false; - callInfo->args[0] = { value.IsBoxed() ? PointerDatumFromPod(value) : ScalarDatumFromPod(value), false }; + callInfo->args[0] = { typeInfo.PassByValue ? + ScalarDatumFromPod(value) : + PointerDatumFromPod(value, typeInfo.TypeLen == -1), false }; auto x = (text*)finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); Y_DEFER{ @@ -826,7 +1109,7 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small"); const char* ptr = buf.data(); buf.Skip(size); - char* ret = (char*)palloc(size + 1); + auto ret = (char*)palloc(size + 1); memcpy(ret, ptr, size); ret[size] = '\0'; return PointerDatumToPod((Datum)ret); @@ -869,3 +1152,4 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { } // namespace NMiniKQL } // namespace NKikimr + diff --git a/ydb/library/yql/parser/pg_wrapper/ya.make b/ydb/library/yql/parser/pg_wrapper/ya.make index adc1c9ceffd..c68519541ac 100644 --- a/ydb/library/yql/parser/pg_wrapper/ya.make +++ b/ydb/library/yql/parser/pg_wrapper/ya.make @@ -32,6 +32,8 @@ PEERDIR( library/cpp/resource ydb/library/yql/minikql/computation ydb/library/yql/parser/pg_catalog + ydb/library/yql/core + library/cpp/yson contrib/libs/icu contrib/libs/libc_compat diff --git a/ydb/library/yql/providers/common/codec/yql_codec.cpp b/ydb/library/yql/providers/common/codec/yql_codec.cpp index 74685167014..28f8ce8c926 100644 --- a/ydb/library/yql/providers/common/codec/yql_codec.cpp +++ b/ydb/library/yql/providers/common/codec/yql_codec.cpp @@ -1237,6 +1237,11 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, return holderFactory.GetEmptyContainer(); } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + return ReadYsonValuePg(pgType, cmd, buf); + } + default: YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); } @@ -1978,6 +1983,12 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, const NUdf::TUnbo break; } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + WriteYsonValueInTableFormatPg(buf, pgType, value); + break; + } + default: YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); } diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h index 969bfb9ca52..963d5231e14 100644 --- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h +++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h @@ -7,6 +7,7 @@ #include <util/generic/vector.h> #include "yql_codec_results.h" +#include "yql_codec_buf.h" namespace NYql { namespace NCommon { @@ -14,5 +15,9 @@ namespace NCommon { void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, const TVector<ui32>* structPositions); +void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value); + +NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf); + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/sql/pg_dummy/CMakeLists.txt b/ydb/library/yql/sql/pg_dummy/CMakeLists.txt index 3dd03ca694b..ed7117d789f 100644 --- a/ydb/library/yql/sql/pg_dummy/CMakeLists.txt +++ b/ydb/library/yql/sql/pg_dummy/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(yql-sql-pg_dummy PUBLIC yql-sql-settings providers-common-codec yql-minikql-computation + library-yql-core ) target_sources(yql-sql-pg_dummy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 5e81f52740a..7af490dac9c 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -1,6 +1,7 @@ #include <ydb/library/yql/sql/pg_sql.h> #include <ydb/library/yql/providers/common/codec/yql_pg_codec.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h> +#include <ydb/library/yql/core/yql_pg_utils.h> namespace NSQLTranslationPG { @@ -26,6 +27,20 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v throw yexception() << "PG types are not supported"; } +void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value) { + Y_UNUSED(buf); + Y_UNUSED(type); + Y_UNUSED(value); + throw yexception() << "PG types are not supported"; +} + +NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) { + Y_UNUSED(type); + Y_UNUSED(cmd); + Y_UNUSED(buf); + throw yexception() << "PG types are not supported"; +} + } // namespace NCommon } // NYql @@ -47,3 +62,17 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { } // namespace NMiniKQL } // namespace NKikimr + +namespace NYql { + +TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot) { + Y_UNUSED(slot); + return Nothing(); +} + +TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) { + Y_UNUSED(typeId); + return Nothing(); +} + +} // NYql diff --git a/ydb/library/yql/sql/pg_dummy/ya.make b/ydb/library/yql/sql/pg_dummy/ya.make index 24afaaa240c..eed8c765639 100644 --- a/ydb/library/yql/sql/pg_dummy/ya.make +++ b/ydb/library/yql/sql/pg_dummy/ya.make @@ -7,6 +7,7 @@ PEERDIR( ydb/library/yql/sql/settings ydb/library/yql/providers/common/codec ydb/library/yql/minikql/computation + ydb/library/yql/core ) SRCS( |