diff options
author | Vitaly Stoyan <vvvv@ydb.tech> | 2024-08-08 12:54:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-08 12:54:50 +0300 |
commit | 7ed98e4f1ed3a5045638cb4f4b762ef3db825918 (patch) | |
tree | 13878d3d06f00c73b34b2fe20b58694ffcb59b49 | |
parent | 0a122bc17e22d15ebe4248c7244e433c3497b874 (diff) | |
download | ydb-7ed98e4f1ed3a5045638cb4f4b762ef3db825918.tar.gz |
Pass pg extensions to dq TaskCommandExecutor (#7548)
7 files changed, 79 insertions, 1 deletions
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index f165774a12..bcf88518b0 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -658,7 +658,8 @@ static const THashSet<TStringBuf> PG_SUPPORTED_PRESORT = { }; TPgType::TPgType(ui32 typeId, const TTypeEnvironment& env) - : TType(EKind::Pg, env.GetTypeOfTypeLazy(), PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name)) + : TType(EKind::Pg, env.GetTypeOfTypeLazy(), + NYql::NPg::HasType(typeId) && PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name)) , TypeId(typeId) { } diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index a98c82e357..4c5d519e86 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -3514,6 +3514,10 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) template <typename F> void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId, bool needCanonizeFp, F f) { YQL_ENSURE(value); // null could not be represented as binary + if (!NPg::HasType(pgTypeId)) { + f(TStringBuf(value.AsStringRef())); + return; + } const bool oldNeedCanonizeFp = NeedCanonizeFp; NeedCanonizeFp = needCanonizeFp; @@ -3748,6 +3752,10 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput } NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) { + if (!NPg::HasType(pgTypeId)) { + return MakeString(binary); + } + TPAllocScope call; StringInfoData stringInfo; stringInfo.data = (char*)binary.Data(); diff --git a/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp new file mode 100644 index 0000000000..c2b70927c2 --- /dev/null +++ b/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp @@ -0,0 +1,34 @@ +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql { + +Y_UNIT_TEST_SUITE(TPGPackTests) { + Y_UNIT_TEST(UnknownTypeAsString) { + using namespace NKikimr::NMiniKQL; + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment env(alloc); + TIntrusivePtr<IFunctionRegistry> functionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry())); + TProgramBuilder pgmBuilder(env, *functionRegistry); + TMemoryUsageInfo memInfo("Memory"); + THolderFactory holderFactory(alloc.Ref(), memInfo, functionRegistry.Get()); + + auto pgType = pgmBuilder.NewPgType(0xffffffff); + TValuePacker pgPacker(false, pgType); + + NUdf::TUnboxedValue s = MakeString(NUdf::TStringRef::Of("foo")); + auto p = pgPacker.Pack(s); + auto u = pgPacker.Unpack(p, holderFactory); + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(u.AsStringRef()), "foo"); + } +} + +} diff --git a/ydb/library/yql/parser/pg_wrapper/ut/ya.make b/ydb/library/yql/parser/pg_wrapper/ut/ya.make index ad0718a524..f6820d19c1 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/ya.make +++ b/ydb/library/yql/parser/pg_wrapper/ut/ya.make @@ -10,6 +10,7 @@ SRCS( codegen_ut.cpp error_ut.cpp memory_ut.cpp + pack_ut.cpp parser_ut.cpp proc_ut.cpp sort_ut.cpp diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index c2db8bd683..6576342ff0 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -54,6 +54,7 @@ #include <util/generic/size_literals.h> #include <util/stream/file.h> #include <util/string/builder.h> +#include <util/folder/dirut.h> #include <memory> #include <vector> @@ -540,6 +541,23 @@ private: TTypeEnvironment& typeEnv, TUserDataTable& files) const { + if (!localRun) { + for (const auto& file : files) { + const auto& fileName = file.first.Alias(); + const auto& block = file.second; + if (fileName == NCommon::PgCatalogFileName || block.Usage.Test(EUserDataBlockUsage::PgExt)) { + auto f = IDqGateway::TFileResource(); + auto filePath = block.FrozenFile->GetPath().GetPath(); + f.SetLocalPath(RealPath(filePath)); + f.SetName(fileName); + f.SetObjectId(block.FrozenFile->GetMd5()); + f.SetObjectType(IDqGateway::TFileResource::EUSER_FILE); + f.SetSize(block.FrozenFile->GetSize()); + uploadList->emplace(f); + } + } + } + if (!State->Settings->_SkipRevisionCheck.Get().GetOrElse(false)) { if (State->VanillaJobPath.empty()) { auto f = IDqGateway::TFileResource(); diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 036c1af555..8d8ee74c7c 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h> #include <ydb/library/yql/providers/dq/counters/task_counters.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h> #include <ydb/library/yql/providers/dq/api/protos/task_command_executor.pb.h> #include <ydb/library/yql/utils/backtrace/backtrace.h> @@ -19,6 +20,9 @@ #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/context.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> + #include <util/system/thread.h> #include <util/system/fs.h> #include <util/system/env.h> @@ -678,6 +682,15 @@ public: Y_ABORT_UNLESS(workingDirectory); NFs::SetCurrentWorkingDirectory(workingDirectory); + QueryStat.Measure<void>("LoadPgExtensions", [&]() + { + if (TFsPath(NCommon::PgCatalogFileName).Exists()) { + TFileInput file(TString{NCommon::PgCatalogFileName}); + NPg::ImportExtensions(file.ReadAll(), false, + NKikimr::NMiniKQL::CreateExtensionLoader().get()); + } + }); + THashMap<TString, TString> modulesMapping; QueryStat.Measure<void>("LoadUdfs", [&]() diff --git a/ydb/library/yql/providers/dq/runtime/ya.make b/ydb/library/yql/providers/dq/runtime/ya.make index b5d81acd97..eab49cd74b 100644 --- a/ydb/library/yql/providers/dq/runtime/ya.make +++ b/ydb/library/yql/providers/dq/runtime/ya.make @@ -10,10 +10,13 @@ PEERDIR( ydb/library/yql/dq/integration/transform ydb/library/yql/dq/runtime ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/provider ydb/library/yql/providers/dq/api/protos ydb/library/yql/providers/dq/common ydb/library/yql/providers/dq/counters ydb/library/yql/providers/dq/task_runner + ydb/library/yql/parser/pg_wrapper/interface + ydb/library/yql/parser/pg_catalog ) YQL_LAST_ABI_VERSION() |