aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vvvv@ydb.tech>2024-08-08 12:54:50 +0300
committerGitHub <noreply@github.com>2024-08-08 12:54:50 +0300
commit7ed98e4f1ed3a5045638cb4f4b762ef3db825918 (patch)
tree13878d3d06f00c73b34b2fe20b58694ffcb59b49
parent0a122bc17e22d15ebe4248c7244e433c3497b874 (diff)
downloadydb-7ed98e4f1ed3a5045638cb4f4b762ef3db825918.tar.gz
Pass pg extensions to dq TaskCommandExecutor (#7548)
-rw-r--r--ydb/library/yql/minikql/mkql_node.cpp3
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp8
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp34
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp18
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp13
-rw-r--r--ydb/library/yql/providers/dq/runtime/ya.make3
7 files changed, 79 insertions, 1 deletions
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp
index f165774a12..bcf88518b0 100644
--- a/ydb/library/yql/minikql/mkql_node.cpp
+++ b/ydb/library/yql/minikql/mkql_node.cpp
@@ -658,7 +658,8 @@ static const THashSet<TStringBuf> PG_SUPPORTED_PRESORT = {
};
TPgType::TPgType(ui32 typeId, const TTypeEnvironment& env)
- : TType(EKind::Pg, env.GetTypeOfTypeLazy(), PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name))
+ : TType(EKind::Pg, env.GetTypeOfTypeLazy(),
+ NYql::NPg::HasType(typeId) && PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name))
, TypeId(typeId)
{
}
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index a98c82e357..4c5d519e86 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -3514,6 +3514,10 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId)
template <typename F>
void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId, bool needCanonizeFp, F f) {
YQL_ENSURE(value); // null could not be represented as binary
+ if (!NPg::HasType(pgTypeId)) {
+ f(TStringBuf(value.AsStringRef()));
+ return;
+ }
const bool oldNeedCanonizeFp = NeedCanonizeFp;
NeedCanonizeFp = needCanonizeFp;
@@ -3748,6 +3752,10 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput
}
NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) {
+ if (!NPg::HasType(pgTypeId)) {
+ return MakeString(binary);
+ }
+
TPAllocScope call;
StringInfoData stringInfo;
stringInfo.data = (char*)binary.Data();
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp
new file mode 100644
index 0000000000..c2b70927c2
--- /dev/null
+++ b/ydb/library/yql/parser/pg_wrapper/ut/pack_ut.cpp
@@ -0,0 +1,34 @@
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_string_util.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql {
+
+Y_UNIT_TEST_SUITE(TPGPackTests) {
+ Y_UNIT_TEST(UnknownTypeAsString) {
+ using namespace NKikimr::NMiniKQL;
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment env(alloc);
+ TIntrusivePtr<IFunctionRegistry> functionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()));
+ TProgramBuilder pgmBuilder(env, *functionRegistry);
+ TMemoryUsageInfo memInfo("Memory");
+ THolderFactory holderFactory(alloc.Ref(), memInfo, functionRegistry.Get());
+
+ auto pgType = pgmBuilder.NewPgType(0xffffffff);
+ TValuePacker pgPacker(false, pgType);
+
+ NUdf::TUnboxedValue s = MakeString(NUdf::TStringRef::Of("foo"));
+ auto p = pgPacker.Pack(s);
+ auto u = pgPacker.Unpack(p, holderFactory);
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(u.AsStringRef()), "foo");
+ }
+}
+
+}
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/ya.make b/ydb/library/yql/parser/pg_wrapper/ut/ya.make
index ad0718a524..f6820d19c1 100644
--- a/ydb/library/yql/parser/pg_wrapper/ut/ya.make
+++ b/ydb/library/yql/parser/pg_wrapper/ut/ya.make
@@ -10,6 +10,7 @@ SRCS(
codegen_ut.cpp
error_ut.cpp
memory_ut.cpp
+ pack_ut.cpp
parser_ut.cpp
proc_ut.cpp
sort_ut.cpp
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index c2db8bd683..6576342ff0 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -54,6 +54,7 @@
#include <util/generic/size_literals.h>
#include <util/stream/file.h>
#include <util/string/builder.h>
+#include <util/folder/dirut.h>
#include <memory>
#include <vector>
@@ -540,6 +541,23 @@ private:
TTypeEnvironment& typeEnv,
TUserDataTable& files) const
{
+ if (!localRun) {
+ for (const auto& file : files) {
+ const auto& fileName = file.first.Alias();
+ const auto& block = file.second;
+ if (fileName == NCommon::PgCatalogFileName || block.Usage.Test(EUserDataBlockUsage::PgExt)) {
+ auto f = IDqGateway::TFileResource();
+ auto filePath = block.FrozenFile->GetPath().GetPath();
+ f.SetLocalPath(RealPath(filePath));
+ f.SetName(fileName);
+ f.SetObjectId(block.FrozenFile->GetMd5());
+ f.SetObjectType(IDqGateway::TFileResource::EUSER_FILE);
+ f.SetSize(block.FrozenFile->GetSize());
+ uploadList->emplace(f);
+ }
+ }
+ }
+
if (!State->Settings->_SkipRevisionCheck.Get().GetOrElse(false)) {
if (State->VanillaJobPath.empty()) {
auto f = IDqGateway::TFileResource();
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 036c1af555..8d8ee74c7c 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
#include <ydb/library/yql/providers/dq/counters/task_counters.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
#include <ydb/library/yql/providers/dq/api/protos/task_command_executor.pb.h>
#include <ydb/library/yql/utils/backtrace/backtrace.h>
@@ -19,6 +20,9 @@
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/context.h>
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+
#include <util/system/thread.h>
#include <util/system/fs.h>
#include <util/system/env.h>
@@ -678,6 +682,15 @@ public:
Y_ABORT_UNLESS(workingDirectory);
NFs::SetCurrentWorkingDirectory(workingDirectory);
+ QueryStat.Measure<void>("LoadPgExtensions", [&]()
+ {
+ if (TFsPath(NCommon::PgCatalogFileName).Exists()) {
+ TFileInput file(TString{NCommon::PgCatalogFileName});
+ NPg::ImportExtensions(file.ReadAll(), false,
+ NKikimr::NMiniKQL::CreateExtensionLoader().get());
+ }
+ });
+
THashMap<TString, TString> modulesMapping;
QueryStat.Measure<void>("LoadUdfs", [&]()
diff --git a/ydb/library/yql/providers/dq/runtime/ya.make b/ydb/library/yql/providers/dq/runtime/ya.make
index b5d81acd97..eab49cd74b 100644
--- a/ydb/library/yql/providers/dq/runtime/ya.make
+++ b/ydb/library/yql/providers/dq/runtime/ya.make
@@ -10,10 +10,13 @@ PEERDIR(
ydb/library/yql/dq/integration/transform
ydb/library/yql/dq/runtime
ydb/library/yql/providers/common/proto
+ ydb/library/yql/providers/common/provider
ydb/library/yql/providers/dq/api/protos
ydb/library/yql/providers/dq/common
ydb/library/yql/providers/dq/counters
ydb/library/yql/providers/dq/task_runner
+ ydb/library/yql/parser/pg_wrapper/interface
+ ydb/library/yql/parser/pg_catalog
)
YQL_LAST_ABI_VERSION()