diff options
author | Vitaly Stoyan <vvvv@ydb.tech> | 2024-08-07 20:03:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-07 20:03:21 +0300 |
commit | 514df1ac101847dfcf7ad0e2575710c4637994c8 (patch) | |
tree | ffd7b9d256c6aee344407fbab54a26695a1bdfc8 | |
parent | 1593a2d91fd739639715bc8eaf419016a1c6a8c7 (diff) | |
download | ydb-514df1ac101847dfcf7ad0e2575710c4637994c8.tar.gz |
Pass pg extensions & catalog to mrjob (#7516)
19 files changed, 756 insertions, 36 deletions
diff --git a/ydb/library/yql/ast/yql_expr.cpp b/ydb/library/yql/ast/yql_expr.cpp index 2126b39d96..4849f416e7 100644 --- a/ydb/library/yql/ast/yql_expr.cpp +++ b/ydb/library/yql/ast/yql_expr.cpp @@ -3282,6 +3282,20 @@ ui32 TPgExprType::GetFlags(ui32 typeId) { return ret; } +ui64 TPgExprType::GetPgExtensionsMask(ui32 typeId) { + auto descPtr = &NPg::LookupType(typeId); + return MakePgExtensionMask(descPtr->ExtensionIndex); +} + +ui64 MakePgExtensionMask(ui32 extensionIndex) { + if (!extensionIndex) { + return 0; + } + + YQL_ENSURE(extensionIndex <= 64); + return 1ull << (extensionIndex - 1); +} + TExprContext::TExprContext(ui64 nextUniqueId) : StringPool(4096) , NextUniqueId(nextUniqueId) diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h index 9bdce43e6c..93f2c773dd 100644 --- a/ydb/library/yql/ast/yql_expr.h +++ b/ydb/library/yql/ast/yql_expr.h @@ -152,10 +152,11 @@ void ReportError(TExprContext& ctx, const TIssue& issue); class TTypeAnnotationNode { protected: - TTypeAnnotationNode(ETypeAnnotationKind kind, ui32 flags, ui64 hash) + TTypeAnnotationNode(ETypeAnnotationKind kind, ui32 flags, ui64 hash, ui64 usedPgExtensions) : Kind(kind) , Flags(flags) , Hash(hash) + , UsedPgExtensions(usedPgExtensions) { } @@ -278,6 +279,10 @@ public: return Hash; } + ui64 GetUsedPgExtensions() const { + return UsedPgExtensions; + } + bool Equals(const TTypeAnnotationNode& node) const; void Accept(TTypeAnnotationVisitor& visitor) const; @@ -310,10 +315,21 @@ protected: return flags; } + template <typename T> + static ui64 CombinePgExtensions(const T& items) { + ui64 mask = 0; + for (auto& item : items) { + mask |= item->GetUsedPgExtensions(); + } + + return mask; + } + private: const ETypeAnnotationKind Kind; const ui32 Flags; const ui64 Hash; + const ui64 UsedPgExtensions; }; class TUnitExprType : public TTypeAnnotationNode { @@ -322,7 +338,7 @@ public: TUnitExprType(ui64 hash) : TTypeAnnotationNode(KindValue, - TypeNonComputable | TypeNonPersistable, hash) + TypeNonComputable | TypeNonPersistable, hash, 0) { } @@ -341,7 +357,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Tuple; TTupleExprType(ui64 hash, const TTypeAnnotationNode::TListType& items) - : TTypeAnnotationNode(KindValue, CombineFlags(items), hash) + : TTypeAnnotationNode(KindValue, CombineFlags(items), hash, CombinePgExtensions(items)) , Items(items) { } @@ -390,7 +406,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Multi; TMultiExprType(ui64 hash, const TTypeAnnotationNode::TListType& items) - : TTypeAnnotationNode(KindValue, CombineFlags(items), hash) + : TTypeAnnotationNode(KindValue, CombineFlags(items), hash, CombinePgExtensions(items)) , Items(items) { } @@ -445,7 +461,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Item; TItemExprType(ui64 hash, const TStringBuf& name, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags(), hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags(), hash, itemType->GetUsedPgExtensions()) , Name(name) , ItemType(itemType) { @@ -501,7 +517,7 @@ public: }; TStructExprType(ui64 hash, const TVector<const TItemExprType*>& items) - : TTypeAnnotationNode(KindValue, TypeNonComparable | CombineFlags(items), hash) + : TTypeAnnotationNode(KindValue, TypeNonComparable | CombineFlags(items), hash, CombinePgExtensions(items)) , Items(items) { } @@ -623,7 +639,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::List; TListExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeHasDynamicSize, hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeHasDynamicSize, hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -650,7 +666,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Stream; TStreamExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -677,7 +693,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Flow; TFlowExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -704,7 +720,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Block; TBlockExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -731,7 +747,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Scalar; TScalarExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeNonPersistable, hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -758,7 +774,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Data; TDataExprType(ui64 hash, EDataSlot slot) - : TTypeAnnotationNode(KindValue, GetFlags(slot), hash) + : TTypeAnnotationNode(KindValue, GetFlags(slot), hash, 0) , Slot(slot) { } @@ -853,7 +869,7 @@ public: // TODO: TypeHasDynamicSize for Pg types TPgExprType(ui64 hash, ui32 typeId) - : TTypeAnnotationNode(KindValue, GetFlags(typeId), hash) + : TTypeAnnotationNode(KindValue, GetFlags(typeId), hash, GetPgExtensionsMask(typeId)) , TypeId(typeId) { } @@ -875,18 +891,21 @@ public: private: ui32 GetFlags(ui32 typeId); + ui64 GetPgExtensionsMask(ui32 typeId); private: ui32 TypeId; }; +ui64 MakePgExtensionMask(ui32 extensionIndex); + class TWorldExprType : public TTypeAnnotationNode { public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::World; TWorldExprType(ui64 hash) : TTypeAnnotationNode(KindValue, - TypeNonComposable | TypeNonComputable | TypeNonPersistable | TypeNonInspectable, hash) + TypeNonComposable | TypeNonComputable | TypeNonPersistable | TypeNonInspectable, hash, 0) { } @@ -905,7 +924,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Optional; TOptionalExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, GetFlags(itemType), hash) + : TTypeAnnotationNode(KindValue, GetFlags(itemType), hash, itemType->GetUsedPgExtensions()) , ItemType(itemType) { } @@ -945,7 +964,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Variant; TVariantExprType(ui64 hash, const TTypeAnnotationNode* underlyingType) - : TTypeAnnotationNode(KindValue, MakeFlags(underlyingType), hash) + : TTypeAnnotationNode(KindValue, MakeFlags(underlyingType), hash, underlyingType->GetUsedPgExtensions()) , UnderlyingType(underlyingType) { } @@ -977,7 +996,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Type; TTypeExprType(ui64 hash, const TTypeAnnotationNode* type) - : TTypeAnnotationNode(KindValue, TypeNonPersistable | TypeNonComputable, hash) + : TTypeAnnotationNode(KindValue, TypeNonPersistable | TypeNonComputable, hash, 0) , Type(type) { } @@ -1005,7 +1024,8 @@ public: TDictExprType(ui64 hash, const TTypeAnnotationNode* keyType, const TTypeAnnotationNode* payloadType) : TTypeAnnotationNode(KindValue, TypeNonComparable | TypeHasDynamicSize | - keyType->GetFlags() | payloadType->GetFlags(), hash) + keyType->GetFlags() | payloadType->GetFlags(), hash, + keyType->GetUsedPgExtensions() | payloadType->GetUsedPgExtensions()) , KeyType(keyType) , PayloadType(payloadType) { @@ -1042,7 +1062,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Void; TVoidExprType(ui64 hash) - : TTypeAnnotationNode(KindValue, 0, hash) + : TTypeAnnotationNode(KindValue, 0, hash, 0) { } @@ -1061,7 +1081,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Null; TNullExprType(ui64 hash) - : TTypeAnnotationNode(KindValue, TypeHasNull, hash) + : TTypeAnnotationNode(KindValue, TypeHasNull, hash, 0) { } @@ -1101,7 +1121,7 @@ public: TCallableExprType(ui64 hash, const TTypeAnnotationNode* returnType, const TVector<TArgumentInfo>& arguments , size_t optionalArgumentsCount, const TStringBuf& payload) - : TTypeAnnotationNode(KindValue, MakeFlags(returnType), hash) + : TTypeAnnotationNode(KindValue, MakeFlags(returnType), hash, returnType->GetUsedPgExtensions()) , ReturnType(returnType) , Arguments(arguments) , OptionalArgumentsCount(optionalArgumentsCount) @@ -1207,7 +1227,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Generic; TGenericExprType(ui64 hash) - : TTypeAnnotationNode(KindValue, TypeNonComputable, hash) + : TTypeAnnotationNode(KindValue, TypeNonComputable, hash, 0) { } @@ -1226,7 +1246,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Resource; TResourceExprType(ui64 hash, const TStringBuf& tag) - : TTypeAnnotationNode(KindValue, TypeNonPersistable | TypeHasManyValues, hash) + : TTypeAnnotationNode(KindValue, TypeNonPersistable | TypeHasManyValues, hash, 0) , Tag(tag) {} @@ -1253,7 +1273,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Tagged; TTaggedExprType(ui64 hash, const TTypeAnnotationNode* baseType, const TStringBuf& tag) - : TTypeAnnotationNode(KindValue, baseType->GetFlags(), hash) + : TTypeAnnotationNode(KindValue, baseType->GetFlags(), hash, baseType->GetUsedPgExtensions()) , BaseType(baseType) , Tag(tag) {} @@ -1290,7 +1310,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Error; TErrorExprType(ui64 hash, const TIssue& error) - : TTypeAnnotationNode(KindValue, 0, hash) + : TTypeAnnotationNode(KindValue, 0, hash, 0) , Error(error) {} @@ -1315,7 +1335,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::EmptyList; TEmptyListExprType(ui64 hash) - : TTypeAnnotationNode(KindValue, 0, hash) + : TTypeAnnotationNode(KindValue, 0, hash, 0) { } @@ -1334,7 +1354,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::EmptyDict; TEmptyDictExprType(ui64 hash) - : TTypeAnnotationNode(KindValue, 0, hash) + : TTypeAnnotationNode(KindValue, 0, hash, 0) { } diff --git a/ydb/library/yql/core/pg_ext/yql_pg_ext.cpp b/ydb/library/yql/core/pg_ext/yql_pg_ext.cpp index 992d509966..093ef14220 100644 --- a/ydb/library/yql/core/pg_ext/yql_pg_ext.cpp +++ b/ydb/library/yql/core/pg_ext/yql_pg_ext.cpp @@ -15,6 +15,7 @@ void PgExtensionsFromProto(const NYql::NProto::TPgExtensions& proto, desc.LibraryPath = e.GetLibraryPath(); desc.TypesOnly = e.GetTypesOnly(); + desc.LibraryMD5 = e.GetLibraryMD5(); extensions.emplace_back(desc); } } diff --git a/ydb/library/yql/core/yql_user_data.h b/ydb/library/yql/core/yql_user_data.h index cc630eeb49..e727cad22d 100644 --- a/ydb/library/yql/core/yql_user_data.h +++ b/ydb/library/yql/core/yql_user_data.h @@ -25,6 +25,7 @@ enum class EUserDataBlockUsage { Content, Udf, Library, + PgExt, End, }; typedef TEnumBitSet<EUserDataBlockUsage, static_cast<int>(EUserDataBlockUsage::Begin), diff --git a/ydb/library/yql/parser/pg_catalog/catalog.cpp b/ydb/library/yql/parser/pg_catalog/catalog.cpp index 5363d3e585..1db2864750 100644 --- a/ydb/library/yql/parser/pg_catalog/catalog.cpp +++ b/ydb/library/yql/parser/pg_catalog/catalog.cpp @@ -1,4 +1,5 @@ #include "catalog.h" +#include <ydb/library/yql/parser/pg_catalog/proto/pg_catalog.pb.h> #include <util/generic/array_size.h> #include <util/generic/utility.h> #include <util/generic/hash.h> @@ -11,9 +12,12 @@ #include <util/system/mutex.h> #include <util/system/tempfile.h> #include <library/cpp/resource/resource.h> +#include <library/cpp/digest/md5/md5.h> namespace NYql::NPg { +const ui32 MaximumExtensionsCount = 64; // see TTypeAnnotationNode::GetUsedPgExtensions + constexpr ui32 FuncMaxArgs = 100; constexpr ui32 InvalidOid = 0; constexpr ui32 Int2VectorOid = 22; @@ -1917,6 +1921,12 @@ struct TCatalog : public IExtensionSqlBuilder { AllowedProcs.insert(t.second.Name); } } + + AllowedProcsWithoutExtensions = AllowedProcs; + TypesWithoutExtensions = Types; + TypeByNameWithoutExtensions = TypeByName; + ProcsWithoutExtensions = Procs; + ProcByNameWithoutExtensions = ProcByName; } void ExportFunction(ui32 procOid) const { @@ -2019,10 +2029,12 @@ struct TCatalog : public IExtensionSqlBuilder { } TTableInfoKey key{table}; - Y_ENSURE(StaticTables.emplace(key, table).second); + TTableInfo value = table; + value.Oid = 16000 + StaticTables.size(); + Y_ENSURE(StaticTables.emplace(key, value).second); Y_ENSURE(StaticColumns.emplace(key, columns).second); - AllStaticTables.push_back(table); + AllStaticTables.push_back(value); for (const auto& c : columns) { AllStaticColumns.push_back(c); } @@ -2104,6 +2116,11 @@ struct TCatalog : public IExtensionSqlBuilder { TMutex ExportGuard; THashSet<TString> AllowedProcs; + THashSet<TString> AllowedProcsWithoutExtensions; + TTypes TypesWithoutExtensions; + THashMap<TString, ui32> TypeByNameWithoutExtensions; + TProcs ProcsWithoutExtensions; + THashMap<TString, TVector<ui32>> ProcByNameWithoutExtensions; }; bool ValidateProcArgs(const TProcDesc& d, const TVector<ui32>& argTypeIds) { @@ -3417,14 +3434,19 @@ bool AreAllFunctionsAllowed() { void RegisterExtensions(const TVector<TExtensionDesc>& extensions, bool typesOnly, IExtensionSqlParser& parser, IExtensionLoader* loader) { + if (extensions.size() > MaximumExtensionsCount) { + throw yexception() << "Too many extensions: " << extensions.size(); + } + auto& catalog = TCatalog::MutableInstance(); with_lock (catalog.ExtensionsGuard) { Y_ENSURE(!catalog.ExtensionsInit); + auto savedAllowAllFunctions = catalog.AllowAllFunctions; catalog.AllowAllFunctions = true; for (ui32 i = 0; i < extensions.size(); ++i) { auto e = extensions[i]; - e.TypesOnly = e.TypesOnly && typesOnly; + e.TypesOnly = e.TypesOnly || typesOnly; if (e.Name.empty()) { throw yexception() << "Empty extension name"; } @@ -3437,6 +3459,10 @@ void RegisterExtensions(const TVector<TExtensionDesc>& extensions, bool typesOnl throw yexception() << "Duplicated extension install name: " << e.InstallName; } + if (e.LibraryMD5.empty()) { + e.LibraryMD5 = MD5::File(e.LibraryPath); + } + catalog.Extensions.push_back(e); TVector<TString> sqls; for (const auto& p : e.SqlPaths) { @@ -3455,6 +3481,338 @@ void RegisterExtensions(const TVector<TExtensionDesc>& extensions, bool typesOnl } } +TString ExportExtensions(const TMaybe<TSet<ui32>>& filter) { + auto& catalog = TCatalog::Instance(); + if (catalog.Extensions.empty()) { + return TString(); + } + + NProto::TPgCatalog proto; + for (ui32 i = 0; i < catalog.Extensions.size(); ++i) { + const auto& ext = catalog.Extensions[i]; + const bool skip = filter && !filter->contains(i + 1); + auto protoExt = proto.AddExtension(); + protoExt->SetName(ext.Name); + protoExt->SetInstallName(ext.InstallName); + protoExt->SetTypesOnly(skip); + if (!skip && !ext.LibraryPath.empty()) { + protoExt->SetLibraryPath(TFsPath(".") / TFsPath(ext.LibraryPath).GetName()); + protoExt->SetLibraryMD5(ext.LibraryMD5); + } + } + + TVector<ui32> extTypes; + for (const auto& t : catalog.Types) { + const auto& desc = t.second; + if (!desc.ExtensionIndex) { + continue; + } + + extTypes.push_back(t.first); + } + + Sort(extTypes); + for (const auto t : extTypes) { + const auto& desc = *catalog.Types.FindPtr(t); + auto protoType = proto.AddType(); + protoType->SetTypeId(desc.TypeId); + protoType->SetName(desc.Name); + protoType->SetExtensionIndex(desc.ExtensionIndex); + protoType->SetCategory(desc.Category); + protoType->SetTypeLen(desc.TypeLen); + protoType->SetPassByValue(desc.PassByValue); + protoType->SetTypeAlign(desc.TypeAlign); + protoType->SetElementTypeId(desc.ElementTypeId); + protoType->SetArrayTypeId(desc.ArrayTypeId); + if (desc.InFuncId) { + protoType->SetInFuncId(desc.InFuncId); + } + if (desc.OutFuncId) { + protoType->SetOutFuncId(desc.OutFuncId); + } + if (desc.SendFuncId) { + protoType->SetSendFuncId(desc.SendFuncId); + } + if (desc.ReceiveFuncId) { + protoType->SetReceiveFuncId(desc.ReceiveFuncId); + } + if (desc.TypeModInFuncId) { + protoType->SetTypeModInFuncId(desc.TypeModInFuncId); + } + if (desc.TypeModOutFuncId) { + protoType->SetTypeModOutFuncId(desc.TypeModOutFuncId); + } + if (desc.TypeSubscriptFuncId) { + protoType->SetTypeSubscriptFuncId(desc.TypeSubscriptFuncId); + } + if (desc.LessProcId) { + protoType->SetLessProcId(desc.LessProcId); + } + if (desc.EqualProcId) { + protoType->SetEqualProcId(desc.EqualProcId); + } + if (desc.CompareProcId) { + protoType->SetCompareProcId(desc.CompareProcId); + } + if (desc.HashProcId) { + protoType->SetHashProcId(desc.HashProcId); + } + } + + TVector<ui32> extProcs; + for (const auto& p : catalog.Procs) { + const auto& desc = p.second; + if (!desc.ExtensionIndex) { + continue; + } + + extProcs.push_back(p.first); + } + + Sort(extProcs); + for (const auto p : extProcs) { + const auto& desc = *catalog.Procs.FindPtr(p); + auto protoProc = proto.AddProc(); + protoProc->SetProcId(desc.ProcId); + protoProc->SetName(desc.Name); + protoProc->SetExtensionIndex(desc.ExtensionIndex); + protoProc->SetSrc(desc.Src); + for (const auto t : desc.ArgTypes) { + protoProc->AddArgType(t); + } + + for (const auto t : desc.OutputArgTypes) { + protoProc->AddOutputArgType(t); + } + + protoProc->SetVariadicType(desc.VariadicType); + protoProc->SetVariadicArgType(desc.VariadicArgType); + for (const auto& name : desc.InputArgNames) { + protoProc->AddInputArgName(name); + } + + for (const auto& name : desc.OutputArgNames) { + protoProc->AddOutputArgName(name); + } + + protoProc->SetVariadicArgName(desc.VariadicArgName); + for (const auto& d : desc.DefaultArgs) { + protoProc->AddDefaultArgNull(!d.Defined()); + protoProc->AddDefaultArgValue(d ? *d : ""); + } + + protoProc->SetIsStrict(desc.IsStrict); + protoProc->SetLang(desc.Lang); + } + + TVector<TTableInfoKey> extTables; + for (const auto& t : catalog.StaticTables) { + if (!t.second.ExtensionIndex) { + continue; + } + + extTables.push_back(t.first); + } + + Sort(extTables); + for (const auto& key : extTables) { + const auto& table = *catalog.StaticTables.FindPtr(key); + auto protoTable = proto.AddTable(); + protoTable->SetOid(table.Oid); + protoTable->SetSchema(table.Schema); + protoTable->SetName(table.Name); + protoTable->SetExtensionIndex(table.ExtensionIndex); + const auto columnsPtr = catalog.StaticColumns.FindPtr(key); + Y_ENSURE(columnsPtr); + for (const auto& c : *columnsPtr) { + protoTable->AddColumn(c.Name); + protoTable->AddUdtType(c.UdtType); + } + + const auto dataPtr = catalog.StaticTablesData.FindPtr(key); + if (dataPtr) { + for (const auto& v : *dataPtr) { + if (v.Defined()) { + protoTable->AddDataNull(false); + protoTable->AddDataValue(*v); + } else { + protoTable->AddDataNull(true); + protoTable->AddDataValue(""); + } + } + } + } + + return proto.SerializeAsString(); +} + +void ImportExtensions(const TString& exported, bool typesOnly, IExtensionLoader* loader) { + auto& catalog = TCatalog::MutableInstance(); + with_lock (catalog.ExtensionsGuard) { + Y_ENSURE(!catalog.ExtensionsInit); + if (exported.empty()) { + catalog.ExtensionsInit = true; + return; + } + + NProto::TPgCatalog proto; + Y_ENSURE(proto.ParseFromString(exported)); + for (ui32 i = 0; i < proto.ExtensionSize(); ++i) { + const auto& protoExt = proto.GetExtension(i); + TExtensionDesc ext; + ext.Name = protoExt.GetName(); + ext.InstallName = protoExt.GetInstallName(); + ext.TypesOnly = protoExt.GetTypesOnly(); + ext.LibraryMD5 = protoExt.GetLibraryMD5(); + ext.LibraryPath = protoExt.GetLibraryPath(); + catalog.Extensions.push_back(ext); + } + + for (const auto& protoType : proto.GetType()) { + TTypeDesc desc; + desc.TypeId = protoType.GetTypeId(); + desc.Name = protoType.GetName(); + desc.ExtensionIndex = protoType.GetExtensionIndex(); + desc.Category = protoType.GetCategory(); + desc.TypeLen = protoType.GetTypeLen(); + desc.PassByValue = protoType.GetPassByValue(); + desc.TypeAlign = protoType.GetTypeAlign(); + desc.ElementTypeId = protoType.GetElementTypeId(); + desc.ArrayTypeId = protoType.GetArrayTypeId(); + desc.InFuncId = protoType.GetInFuncId(); + desc.OutFuncId = protoType.GetOutFuncId(); + desc.SendFuncId = protoType.GetSendFuncId(); + desc.ReceiveFuncId = protoType.GetReceiveFuncId(); + desc.TypeModInFuncId = protoType.GetTypeModInFuncId(); + desc.TypeModOutFuncId = protoType.GetTypeModOutFuncId(); + desc.TypeSubscriptFuncId = protoType.GetTypeSubscriptFuncId(); + desc.LessProcId = protoType.GetLessProcId(); + desc.EqualProcId = protoType.GetEqualProcId(); + desc.CompareProcId = protoType.GetCompareProcId(); + desc.HashProcId = protoType.GetHashProcId(); + Y_ENSURE(catalog.Types.emplace(desc.TypeId, desc).second); + Y_ENSURE(catalog.TypeByName.emplace(desc.Name, desc.TypeId).second); + } + + for (const auto& protoProc : proto.GetProc()) { + TProcDesc desc; + desc.ProcId = protoProc.GetProcId(); + desc.Name = protoProc.GetName(); + desc.ExtensionIndex = protoProc.GetExtensionIndex(); + desc.Src = protoProc.GetSrc(); + desc.IsStrict = protoProc.GetIsStrict(); + desc.Lang = protoProc.GetLang(); + for (const auto t : protoProc.GetArgType()) { + desc.ArgTypes.push_back(t); + } + for (const auto t : protoProc.GetOutputArgType()) { + desc.OutputArgTypes.push_back(t); + } + desc.VariadicType = protoProc.GetVariadicType(); + desc.VariadicArgType = protoProc.GetVariadicArgType(); + for (const auto& name : protoProc.GetInputArgName()) { + desc.InputArgNames.push_back(name); + } + for (const auto& name : protoProc.GetOutputArgName()) { + desc.OutputArgNames.push_back(name); + } + desc.VariadicArgName = protoProc.GetVariadicArgName(); + Y_ENSURE(protoProc.DefaultArgNullSize() == protoProc.DefaultArgValueSize()); + for (ui32 i = 0; i < protoProc.DefaultArgNullSize(); ++i) { + if (protoProc.GetDefaultArgNull(i)) { + desc.DefaultArgs.push_back(Nothing()); + } else { + desc.DefaultArgs.push_back(protoProc.GetDefaultArgValue(i)); + } + } + + Y_ENSURE(catalog.Procs.emplace(desc.ProcId, desc).second); + catalog.ProcByName[desc.Name].push_back(desc.ProcId); + } + + for (const auto& protoTable : proto.GetTable()) { + TTableInfo table; + table.Oid = protoTable.GetOid(); + table.Schema = protoTable.GetSchema(); + table.Name = protoTable.GetName(); + table.Kind = ERelKind::Relation; + table.ExtensionIndex = protoTable.GetExtensionIndex(); + catalog.AllStaticTables.push_back(table); + TTableInfoKey key = table; + Y_ENSURE(catalog.StaticTables.emplace(key, table).second); + Y_ENSURE(protoTable.ColumnSize() > 0); + Y_ENSURE(protoTable.ColumnSize() == protoTable.UdtTypeSize()); + for (ui32 i = 0; i < protoTable.ColumnSize(); ++i) { + TColumnInfo columnInfo; + columnInfo.Schema = table.Schema; + columnInfo.TableName = table.Name; + columnInfo.ExtensionIndex = table.ExtensionIndex; + columnInfo.Name = protoTable.GetColumn(i); + columnInfo.UdtType = protoTable.GetUdtType(i); + catalog.AllStaticColumns.push_back(columnInfo); + catalog.StaticColumns[key].push_back(columnInfo); + } + + if (protoTable.DataValueSize() > 0) { + Y_ENSURE(protoTable.DataValueSize() == protoTable.DataNullSize()); + auto& data = catalog.StaticTablesData[key]; + data.reserve(protoTable.DataValueSize()); + for (ui64 i = 0; i < protoTable.DataValueSize(); ++i) { + if (protoTable.GetDataNull(i)) { + data.push_back(Nothing()); + } else { + data.push_back(protoTable.GetDataValue(i)); + } + } + } + } + + if (!typesOnly && loader) { + for (ui32 extensionIndex = 1; extensionIndex <= catalog.Extensions.size(); ++extensionIndex) { + const auto& e = catalog.Extensions[extensionIndex - 1]; + if (!e.TypesOnly) { + loader->Load(extensionIndex, e.Name, e.LibraryPath); + } + } + } + + catalog.AllowAllFunctions = true; + catalog.AllowedProcs.clear(); + catalog.ExtensionsInit = true; + } +} + +void ClearExtensions() { + auto& catalog = TCatalog::MutableInstance(); + with_lock (catalog.ExtensionsGuard) { + if (!catalog.ExtensionsInit) { + return; + } + + catalog.StaticTablesData.clear(); + while (!catalog.AllStaticTables.empty() && catalog.AllStaticTables.back().ExtensionIndex) { + catalog.StaticTables.erase(TTableInfoKey{catalog.AllStaticTables.back()}); + catalog.StaticColumns.erase(TTableInfoKey{catalog.AllStaticTables.back()}); + catalog.AllStaticTables.pop_back(); + } + + while (!catalog.AllStaticColumns.empty() && catalog.AllStaticColumns.back().ExtensionIndex) { + catalog.AllStaticColumns.pop_back(); + } + + catalog.AllowedProcs = catalog.AllowedProcsWithoutExtensions; + catalog.Types = catalog.TypesWithoutExtensions; + catalog.TypeByName = catalog.TypeByNameWithoutExtensions; + catalog.Procs = catalog.ProcsWithoutExtensions; + catalog.ProcByName = catalog.ProcByNameWithoutExtensions; + catalog.Extensions.clear(); + catalog.ExtensionsByName.clear(); + catalog.ExtensionsByInstallName.clear(); + + catalog.ExtensionsInit = false; + } +} + void EnumExtensions(std::function<void(ui32, const TExtensionDesc&)> f) { const auto& catalog = TCatalog::Instance(); for (ui32 i = 0; i < catalog.Extensions.size(); ++i) { diff --git a/ydb/library/yql/parser/pg_catalog/catalog.h b/ydb/library/yql/parser/pg_catalog/catalog.h index 9b0a6fe5fa..c2d30c9eb4 100644 --- a/ydb/library/yql/parser/pg_catalog/catalog.h +++ b/ydb/library/yql/parser/pg_catalog/catalog.h @@ -3,6 +3,7 @@ #include <util/generic/maybe.h> #include <util/generic/string.h> #include <util/generic/vector.h> +#include <util/generic/set.h> #include <util/stream/output.h> #include <variant> #include <functional> @@ -326,6 +327,10 @@ struct TTableInfoKey { return Schema == other.Schema && Name == other.Name; } + bool operator<(const TTableInfoKey& other) const { + return std::tie(Schema, Name) < std::tie(other.Schema, other.Name); + } + size_t Hash() const { auto stringHasher = THash<TString>(); return CombineHashes(stringHasher(Schema), stringHasher(Name)); @@ -375,6 +380,7 @@ struct TExtensionDesc { TVector<TString> SqlPaths; // paths to SQL files with DDL (CREATE TYPE/CREATE FUNCTION/etc), DML (INSERT/VALUES) TString LibraryPath; // file path bool TypesOnly = false; // Can't be loaded if true + TString LibraryMD5; // optional }; class IExtensionSqlBuilder { @@ -405,9 +411,13 @@ public: virtual void Load(ui32 extensionIndex, const TString& name, const TString& path) = 0; }; -// should be called at most once before other catalog functions +// either RegisterExtensions or ImportExtensions should be called at most once, see ClearExtensions as well void RegisterExtensions(const TVector<TExtensionDesc>& extensions, bool typesOnly, IExtensionSqlParser& parser, IExtensionLoader* loader); +// converts all library paths to basenames +TString ExportExtensions(const TMaybe<TSet<ui32>>& filter = Nothing()); +void ImportExtensions(const TString& exported, bool typesOnly, IExtensionLoader* loader); +void ClearExtensions(); void EnumExtensions(std::function<void(ui32 extensionIndex, const TExtensionDesc&)> f); const TExtensionDesc& LookupExtension(ui32 extensionIndex); diff --git a/ydb/library/yql/parser/pg_catalog/proto/pg_catalog.proto b/ydb/library/yql/parser/pg_catalog/proto/pg_catalog.proto new file mode 100644 index 0000000000..9b780077d6 --- /dev/null +++ b/ydb/library/yql/parser/pg_catalog/proto/pg_catalog.proto @@ -0,0 +1,62 @@ +package NYql.NProto; +option java_package = "ru.yandex.yql.proto"; +import "ydb/library/yql/protos/pg_ext.proto"; + +message TPgType { + optional uint32 TypeId = 1; + optional string Name = 2; + optional uint32 ExtensionIndex = 3; + optional uint32 ArrayTypeId = 4; + optional uint32 Category = 5; + optional int32 TypeLen = 6; + optional bool PassByValue = 7; + optional uint32 TypeAlign = 8; + optional uint32 ElementTypeId = 9; + optional uint32 InFuncId = 10; + optional uint32 OutFuncId = 11; + optional uint32 SendFuncId = 12; + optional uint32 ReceiveFuncId = 13; + optional uint32 TypeModInFuncId = 14; + optional uint32 TypeModOutFuncId = 15; + optional uint32 TypeSubscriptFuncId = 16; + optional uint32 LessProcId = 17; + optional uint32 EqualProcId = 18; + optional uint32 CompareProcId = 19; + optional uint32 HashProcId = 20; +} + +message TPgProc { + optional uint32 ProcId = 1; + optional string Name = 2; + optional uint32 ExtensionIndex = 3; + optional string Src = 4; + repeated uint32 ArgType = 5; + repeated uint32 OutputArgType = 6; + optional uint32 VariadicType = 7; + optional uint32 VariadicArgType = 8; + repeated bool DefaultArgNull = 9; + repeated string DefaultArgValue = 10; + repeated string InputArgName = 11; + repeated string OutputArgName = 12; + optional string VariadicArgName = 13; + optional bool IsStrict = 14; + optional uint32 Lang = 15; +} + +message TPgTable { + optional uint32 Oid = 1; + optional string Schema = 2; + optional string Name = 3; + optional uint32 ExtensionIndex = 4; + repeated string Column = 5; + repeated string UdtType = 6; + repeated string DataValue = 7; + repeated bool DataNull = 8; +} + +message TPgCatalog { + repeated TPgType Type = 1; + repeated TPgProc Proc = 2; + repeated TPgTable Table = 3; + repeated TPgExtension Extension = 4; +} diff --git a/ydb/library/yql/parser/pg_catalog/proto/ya.make b/ydb/library/yql/parser/pg_catalog/proto/ya.make new file mode 100644 index 0000000000..fc5e73c656 --- /dev/null +++ b/ydb/library/yql/parser/pg_catalog/proto/ya.make @@ -0,0 +1,13 @@ +PROTO_LIBRARY() + +SRCS( + pg_catalog.proto +) + +PEERDIR( + ydb/library/yql/protos +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/library/yql/parser/pg_catalog/ya.make b/ydb/library/yql/parser/pg_catalog/ya.make index ed2d206142..7aeaa1f80b 100644 --- a/ydb/library/yql/parser/pg_catalog/ya.make +++ b/ydb/library/yql/parser/pg_catalog/ya.make @@ -20,7 +20,9 @@ SRCS( PEERDIR( library/cpp/resource ydb/library/yql/public/issue + ydb/library/yql/parser/pg_catalog/proto ydb/library/yql/protos + library/cpp/digest/md5 ) END() diff --git a/ydb/library/yql/protos/pg_ext.proto b/ydb/library/yql/protos/pg_ext.proto index 79a363e0f4..aa1f6eb982 100644 --- a/ydb/library/yql/protos/pg_ext.proto +++ b/ydb/library/yql/protos/pg_ext.proto @@ -7,6 +7,7 @@ message TPgExtension { repeated string SqlPath = 3; optional string LibraryPath = 4; optional bool TypesOnly = 5; + optional string LibraryMD5 = 6; } message TPgExtensions { diff --git a/ydb/library/yql/providers/common/provider/ya.make b/ydb/library/yql/providers/common/provider/ya.make index 26cd61bf5c..d5c6495035 100644 --- a/ydb/library/yql/providers/common/provider/ya.make +++ b/ydb/library/yql/providers/common/provider/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/library/yql/public/udf ydb/library/yql/sql # fixme ydb/library/yql/core + ydb/library/yql/parser/pg_catalog ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 55153232fd..05d6eac52f 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_execution.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> @@ -842,12 +843,42 @@ bool FillUsedFilesImpl( const TTypeAnnotationContext& types, TExprContext& ctx, const TUserDataTable& crutches, - TNodeSet& visited) + TNodeSet& visited, + ui64& usedPgExtensions, + bool needFullPgCatalog) { if (!visited.insert(&node).second) { return true; } + if (node.GetTypeAnn()) { + usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions(); + } + + if (node.IsCallable("PgResolvedCall")) { + auto procId = FromString<ui32>(node.Child(1)->Content()); + const auto& proc = NPg::LookupProc(procId); + usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex); + } + + if (node.IsCallable("PgResolvedOp")) { + auto operId = FromString<ui32>(node.Child(1)->Content()); + const auto& oper = NPg::LookupOper(operId); + const auto& proc = NPg::LookupProc(oper.ProcId); + usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex); + } + + if (node.IsCallable({"PgAnyResolvedOp", "PgAllResolvedOp"})) { + auto operId = FromString<ui32>(node.Child(1)->Content()); + const auto& oper = NPg::LookupOper(operId); + const auto& proc = NPg::LookupProc(oper.ProcId); + usedPgExtensions |= MakePgExtensionMask(proc.ExtensionIndex); + } + + if (node.IsCallable("PgTableContent")) { + needFullPgCatalog = true; + } + if (node.IsCallable("FilePath") || node.IsCallable("FileContent")) { const auto& name = node.Head().Content(); const auto block = types.UserDataStorage->FindUserDataBlock(name); @@ -956,7 +987,8 @@ bool FillUsedFilesImpl( bool childrenOk = true; for (auto& child : node.Children()) { - childrenOk = FillUsedFilesImpl(*child, files, types, ctx, crutches, visited) && childrenOk; + childrenOk = FillUsedFilesImpl(*child, files, types, ctx, crutches, visited, + usedPgExtensions, needFullPgCatalog) && childrenOk; } return childrenOk; @@ -1031,6 +1063,43 @@ void FillSecureParams( } } +bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files, + const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) { + + TUserDataBlock block; + block.Data = pathOrContent; + if (isPath) { + block.Type = EUserDataType::PATH; + block.Usage.Set(EUserDataBlockUsage::Path); + block.Usage.Set(EUserDataBlockUsage::PgExt); + } else { + block.Type = EUserDataType::RAW_INLINE_DATA; + block.Usage.Set(EUserDataBlockUsage::Content); + } + + auto key = TUserDataKey::File(alias); + if (const auto foundBlock = types.UserDataStorage->FindUserDataBlock(key)) { + files[key] = *foundBlock; + YQL_ENSURE(!isPath || foundBlock->FrozenFile); + } else { + // Check alias clash with user files + if (files.contains(TUserDataStorage::ComposeUserDataKey(alias))) { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "File " << alias << " clashes with one of the user's files")); + return false; + } + + // CreateFakeFileLink calculates md5 for file, let's do it once if needed + if (isPath) { + block.FrozenFile = CreateFakeFileLink(block.Data, md5); + } + + files[key] = block; + types.UserDataStorage->AddUserDataBlock(key, block); + } + + return true; +} + bool FillUsedFiles( const TExprNode& node, TUserDataTable& files, @@ -1038,7 +1107,42 @@ bool FillUsedFiles( TExprContext& ctx, const TUserDataTable& crutches) { TNodeSet visited; - return FillUsedFilesImpl(node, files, types, ctx, crutches, visited); + ui64 usedPgExtensions = 0; + bool needFullPgCatalog = false; + auto ret = FillUsedFilesImpl(node, files, types, ctx, crutches, visited, usedPgExtensions, needFullPgCatalog); + if (!ret) { + return false; + } + + auto remainingPgExtensions = usedPgExtensions; + TSet<ui32> filter; + for (ui32 extensionIndex = 1; remainingPgExtensions && (extensionIndex <= 64); ++extensionIndex) { + auto mask = MakePgExtensionMask(extensionIndex); + if (!(mask & usedPgExtensions)) { + continue; + } + + filter.insert(extensionIndex); + remainingPgExtensions &= ~mask; + const auto& e = NPg::LookupExtension(extensionIndex); + needFullPgCatalog = true; + auto alias = TFsPath(e.LibraryPath).GetName(); + if (!AddPgFile(true, e.LibraryPath, e.LibraryMD5, alias, files, types, node.Pos(), ctx)) { + return false; + } + } + + Y_ENSURE(remainingPgExtensions == 0); + if (!needFullPgCatalog) { + return true; + } + + TString content = NPg::ExportExtensions(filter); + if (!AddPgFile(false, content, "", TString(PgCatalogFileName), files, types, node.Pos(), ctx)) { + return false; + } + + return true; } std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> FreezeUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const std::function<bool(const TString&)>& urlDownloadFilter, const TUserDataTable& crutches) { diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index fd9eacbeed..39809bdf65 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -32,6 +32,8 @@ struct TOperationStatistics; namespace NCommon { +constexpr TStringBuf PgCatalogFileName = "_yql_pg_catalog"; + struct TWriteTableSettings { NNodes::TMaybeNode<NNodes::TCoAtom> Mode; NNodes::TMaybeNode<NNodes::TCoAtom> Temporary; diff --git a/ydb/library/yql/providers/yt/gateway/lib/user_files.cpp b/ydb/library/yql/providers/yt/gateway/lib/user_files.cpp index 50b5aff355..abb1e3963c 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/user_files.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/user_files.cpp @@ -1,6 +1,7 @@ #include "user_files.h" #include <ydb/library/yql/providers/yt/common/yql_names.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -26,6 +27,8 @@ void TUserFiles::AddFile(const TUserDataKey& key, const TUserDataBlock& block) { TFileInfo userFile; userFile.IsUdf = block.Usage.Test(EUserDataBlockUsage::Udf); + userFile.IsPgExt = block.Usage.Test(EUserDataBlockUsage::PgExt); + userFile.IsPgCatalog = (key.Alias() == NCommon::PgCatalogFileName); if (block.Options.contains("bypass_artifact_cache")) { auto option = block.Options.at(TString("bypass_artifact_cache")); @@ -99,8 +102,6 @@ inline bool TUserFiles::IsEmpty() const { return Files.empty(); } - - } // NYql diff --git a/ydb/library/yql/providers/yt/gateway/lib/user_files.h b/ydb/library/yql/providers/yt/gateway/lib/user_files.h index 0ccb6d1f62..cdc5ecccb9 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/user_files.h +++ b/ydb/library/yql/providers/yt/gateway/lib/user_files.h @@ -23,6 +23,8 @@ public: struct TFileInfo { TFileLinkPtr Path; // Real path in storage bool IsUdf = false; + bool IsPgExt = false; + bool IsPgCatalog = false; ui64 InMemorySize = 0; TString RemotePath; double RemoteMemoryFactor = 0.; diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp index 4a220d6b19..a436d006dd 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/providers/yt/lib/skiff/yql_skiff_schema.h> #include <ydb/library/yql/providers/yt/common/yql_names.h> #include <ydb/library/yql/providers/yt/common/yql_configuration.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/yt/codec/yt_codec.h> #include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h> #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> @@ -62,6 +63,12 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet if (optLLVM != "OFF") { *UsedMem_ = 128_MB; } + + for (const auto& f: ExecCtx_.UserFiles_->GetFiles()) { + if (f.second.IsPgExt || f.second.IsPgCatalog) { + AddFile(f.second.IsPgCatalog ? TString(NCommon::PgCatalogFileName) : "", f.second); + } + } } TCallableVisitFunc TGatewayTransformer::operator()(TInternName name) { diff --git a/ydb/library/yql/providers/yt/job/ya.make b/ydb/library/yql/providers/yt/job/ya.make index 2cf84ba370..7b7ad0f66a 100644 --- a/ydb/library/yql/providers/yt/job/ya.make +++ b/ydb/library/yql/providers/yt/job/ya.make @@ -21,6 +21,9 @@ PEERDIR( ydb/library/yql/public/udf ydb/library/yql/utils ydb/library/yql/utils/backtrace + ydb/library/yql/parser/pg_catalog + ydb/library/yql/parser/pg_wrapper/interface + ydb/library/yql/providers/common/provider ydb/library/yql/providers/common/codec ydb/library/yql/providers/common/comp_nodes ydb/library/yql/providers/common/mkql diff --git a/ydb/library/yql/providers/yt/job/yql_job_base.cpp b/ydb/library/yql/providers/yt/job/yql_job_base.cpp index 73eff1eac2..628c8b236f 100644 --- a/ydb/library/yql/providers/yt/job/yql_job_base.cpp +++ b/ydb/library/yql/providers/yt/job/yql_job_base.cpp @@ -2,6 +2,9 @@ #include "yql_job_stats_writer.h" #include "yql_job_factory.h" +#include <ydb/library/yql/providers/common/provider/yql_provider.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/context.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/mkql_stats_registry.h> @@ -243,6 +246,12 @@ void TYqlJobBase::Init() { #endif } + if (TFsPath(NCommon::PgCatalogFileName).Exists()) { + TFileInput file(TString{NCommon::PgCatalogFileName}); + NPg::ImportExtensions(file.ReadAll(), false, + NKikimr::NMiniKQL::CreateExtensionLoader().get()); + } + FillStaticModules(*funcRegistry); for (const auto& mod: UdfModules) { auto path = mod.first; diff --git a/ydb/library/yql/sql/pg/pg_sql_ut.cpp b/ydb/library/yql/sql/pg/pg_sql_ut.cpp index 3a962d5d6b..431024af72 100644 --- a/ydb/library/yql/sql/pg/pg_sql_ut.cpp +++ b/ydb/library/yql/sql/pg/pg_sql_ut.cpp @@ -2,6 +2,10 @@ #include <library/cpp/testing/unittest/registar.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/parser.h> + +#include <util/system/tempfile.h> + using namespace NSQLTranslation; Y_UNIT_TEST_SUITE(PgSqlParsingOnly) { @@ -630,3 +634,108 @@ from pg_catalog.pg_type)", UNIT_ASSERT(res.Root); } } + +Y_UNIT_TEST_SUITE(PgExtensions) { + using namespace NYql; + + Y_UNIT_TEST(Empty) { + NPg::ClearExtensions(); + UNIT_ASSERT_VALUES_EQUAL(NPg::ExportExtensions(), ""); + NPg::ImportExtensions("", true, nullptr); + } + + Y_UNIT_TEST(ProcsAndType) { + NPg::ClearExtensions(); + NPg::TExtensionDesc desc; + TTempFileHandle h; + TStringBuf sql = R"( + CREATE OR REPLACE FUNCTION mytype_in(cstring) + RETURNS mytype + AS '$libdir/MyExt','mytype_in_func' + LANGUAGE 'c' IMMUTABLE STRICT PARALLEL SAFE; + + CREATE OR REPLACE FUNCTION mytype_out(mytype) + RETURNS cstring + AS '$libdir/MyExt','mytype_out_func' + LANGUAGE 'c' IMMUTABLE STRICT PARALLEL SAFE; + + CREATE TYPE mytype ( + alignment = double, + internallength = 65, + input = mytype_in, + output = mytype_out + ); + )"; + + h.Write(sql.Data(), sql.Size()); + desc.Name = "MyExt"; + desc.InstallName = "$libdir/MyExt"; + desc.SqlPaths.push_back(h.Name()); + NPg::RegisterExtensions({desc}, true, *NSQLTranslationPG::CreateExtensionSqlParser(), nullptr); + auto validate = [&]() { + const auto& type = NPg::LookupType("mytype"); + UNIT_ASSERT_VALUES_EQUAL(type.Category, 'U'); + UNIT_ASSERT_VALUES_EQUAL(type.TypeLen, 65); + UNIT_ASSERT_VALUES_EQUAL(type.TypeAlign, 'd'); + const auto& arrType = NPg::LookupType("_mytype"); + UNIT_ASSERT_VALUES_EQUAL(arrType.ElementTypeId, type.TypeId); + const auto& inProc = NPg::LookupProc("mytype_in", { NPg::LookupType("cstring").TypeId }); + UNIT_ASSERT_VALUES_EQUAL(inProc.ArgTypes.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(inProc.Src, "mytype_in_func"); + UNIT_ASSERT(inProc.IsStrict); + const auto& outProc = NPg::LookupProc("mytype_out", { NPg::LookupType("mytype").TypeId }); + UNIT_ASSERT_VALUES_EQUAL(outProc.ArgTypes.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(outProc.Src, "mytype_out_func"); + UNIT_ASSERT(outProc.IsStrict); + UNIT_ASSERT_VALUES_EQUAL(type.InFuncId, inProc.ProcId); + UNIT_ASSERT_VALUES_EQUAL(type.OutFuncId, outProc.ProcId); + }; + + validate(); + auto exported = NPg::ExportExtensions(); + NPg::ClearExtensions(); + NPg::ImportExtensions(exported, true, nullptr); + validate(); + } + + Y_UNIT_TEST(InsertValues) { + NPg::ClearExtensions(); + NPg::TExtensionDesc desc; + TTempFileHandle h; + TStringBuf sql = R"( + CREATE TABLE mytable( + foo int4, + bar text, + baz double + ); + + INSERT INTO mytable(bar, foo, baz) + VALUES ('a', 1, null),('b', null, -3.4); + )"; + + h.Write(sql.Data(), sql.Size()); + desc.Name = "MyExt"; + desc.InstallName = "$libdir/MyExt"; + desc.SqlPaths.push_back(h.Name()); + NPg::RegisterExtensions({desc}, true, *NSQLTranslationPG::CreateExtensionSqlParser(), nullptr); + auto validate = [&]() { + const auto& table = NPg::LookupStaticTable({"pg_catalog","mytable"}); + UNIT_ASSERT(table.Kind == NPg::ERelKind::Relation); + size_t remap[2]; + size_t rowStep; + const auto& data = *NPg::ReadTable({"pg_catalog", "mytable"}, {"foo", "bar"}, remap, rowStep); + UNIT_ASSERT_VALUES_EQUAL(rowStep, 3); + UNIT_ASSERT_VALUES_EQUAL(data.size(), 2 * rowStep); + UNIT_ASSERT_VALUES_EQUAL(data[rowStep * 0 + remap[0]], "1"); + UNIT_ASSERT_VALUES_EQUAL(data[rowStep * 0 + remap[1]], "a"); + UNIT_ASSERT(!data[rowStep * 1 + remap[0]].Defined()); + UNIT_ASSERT_VALUES_EQUAL(data[rowStep * 1 + remap[1]], "b"); + }; + + validate(); + auto exported = NPg::ExportExtensions(); + NPg::ClearExtensions(); + NPg::ImportExtensions(exported, true, nullptr); + validate(); + } +} |