aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vvvv@ydb.tech>2024-04-27 13:02:26 +0300
committerGitHub <noreply@github.com>2024-04-27 13:02:26 +0300
commit9d1cd2747dc601ea8280beac1c48df1a232c353d (patch)
tree9a0e5a9c66e9517903a04f599c4884b52107b94c
parentd8c3404d9994513de8d9a8602c0d08ebb5669112 (diff)
downloadydb-9d1cd2747dc601ea8280beac1c48df1a232c353d.tar.gz
Initial support of replaying udf_resolver (#4166)
-rw-r--r--ydb/library/yql/core/facade/ya.make1
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp4
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h2
-rw-r--r--ydb/library/yql/core/qplayer/udf_resolver/ya.make16
-rw-r--r--ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp152
-rw-r--r--ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h9
-rw-r--r--ydb/library/yql/core/qplayer/ya.make1
-rw-r--r--ydb/library/yql/core/ut/ya.make2
-rw-r--r--ydb/library/yql/core/ut/yql_qplayer_ut.cpp19
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp26
10 files changed, 217 insertions, 15 deletions
diff --git a/ydb/library/yql/core/facade/ya.make b/ydb/library/yql/core/facade/ya.make
index 8a09cfd941e..0b4ec691f1b 100644
--- a/ydb/library/yql/core/facade/ya.make
+++ b/ydb/library/yql/core/facade/ya.make
@@ -19,6 +19,7 @@ PEERDIR(
ydb/library/yql/core/url_preprocessing/interface
ydb/library/yql/core/credentials
ydb/library/yql/core/qplayer/storage/interface
+ ydb/library/yql/core/qplayer/udf_resolver
ydb/library/yql/sql
ydb/library/yql/utils/log
ydb/library/yql/core
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 23ec8486c66..13f65d82981 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -22,6 +22,7 @@
#include <ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/config/yql_setting.h>
+#include <ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h>
#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/deprecated/split/split_iterator.h>
@@ -316,7 +317,10 @@ TProgram::~TProgram() {
void TProgram::SetQContext(const TQContext& qContext) {
YQL_PROFILE_FUNC(TRACE);
YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
+ YQL_ENSURE(!QContext_.CanRead() && !QContext_.CanWrite());
+ YQL_ENSURE(qContext.CanRead() || qContext.CanWrite());
QContext_ = qContext;
+ UdfResolver_ = NCommon::WrapUdfResolverWithQContext(UdfResolver_, qContext);
}
void TProgram::ConfigureYsonResultFormat(NYson::EYsonFormat format) {
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index e794f805252..c47a4a6fa85 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -401,7 +401,7 @@ private:
TYqlOperationOptions OperationOptions_;
TCredentials::TPtr Credentials_;
const IUrlListerManagerPtr UrlListerManager_;
- const IUdfResolver::TPtr UdfResolver_;
+ IUdfResolver::TPtr UdfResolver_;
const TUdfIndex::TPtr UdfIndex_;
const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_;
const TFileStoragePtr FileStorage_;
diff --git a/ydb/library/yql/core/qplayer/udf_resolver/ya.make b/ydb/library/yql/core/qplayer/udf_resolver/ya.make
new file mode 100644
index 00000000000..1a4160496cf
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/udf_resolver/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ yql_qplayer_udf_resolver.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/core/qplayer/storage/interface
+ ydb/library/yql/providers/common/schema/expr
+ ydb/library/yql/core
+ library/cpp/yson/node
+ contrib/libs/openssl
+)
+
+END()
+
diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
new file mode 100644
index 00000000000..cccdec5781b
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
@@ -0,0 +1,152 @@
+#include "yql_qplayer_udf_resolver.h"
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
+#include <library/cpp/yson/node/node_io.h>
+
+#include <openssl/sha.h>
+
+namespace NYql::NCommon {
+
+namespace {
+
+const TString UdfResolver_LoadMetadata = "UdfResolver_LoadMetadata";
+
+TString MakeHash(const TString& str) {
+ SHA256_CTX sha;
+ SHA256_Init(&sha);
+ SHA256_Update(&sha, str.Data(), str.Size());
+ unsigned char hash[SHA256_DIGEST_LENGTH];
+ SHA256_Final(hash, &sha);
+ return TString((const char*)hash, sizeof(hash));
+}
+
+class TResolver : public IUdfResolver {
+public:
+ TResolver(IUdfResolver::TPtr inner, const TQContext& qContext)
+ : Inner_(inner)
+ , QContext_(qContext)
+ {}
+
+ TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const final {
+ if (QContext_.CanRead()) {
+ ythrow yexception() << "can't replay GetSystemModulePath";
+ }
+
+ return Inner_-> GetSystemModulePath(moduleName);
+ }
+
+ bool LoadMetadata(const TVector<TImport*>& imports,
+ const TVector<TFunction*>& functions, TExprContext& ctx) const final {
+ if (QContext_.CanRead()) {
+ for (auto& f : functions) {
+ auto key = MakeKey(f);
+ auto res = QContext_.GetReader()->Get({UdfResolver_LoadMetadata, key}).GetValueSync();
+ if (!res) {
+ ythrow yexception() << "Missing replay data";
+ }
+
+ LoadValue(f, res->Value, ctx);
+ }
+
+ return true;
+ }
+
+ auto res = Inner_->LoadMetadata(imports, functions, ctx);
+ if (res && QContext_.CanWrite()) {
+ // calculate hash for each function and store it
+ for (const auto& f : functions) {
+ auto key = MakeKey(f);
+ auto value = SaveValue(f);
+ QContext_.GetWriter()->Put({UdfResolver_LoadMetadata, key}, value).GetValueSync();
+ }
+ }
+
+ return res;
+ }
+
+ TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const final {
+ if (QContext_.CanRead()) {
+ ythrow yexception() << "can't replay LoadRichMetadata";
+ }
+
+ return Inner_->LoadRichMetadata(imports);
+ }
+
+ bool ContainsModule(const TStringBuf& moduleName) const final {
+ if (QContext_.CanRead()) {
+ ythrow yexception() << "can't replay ContainsModule";
+ }
+
+ return Inner_->ContainsModule(moduleName);
+ }
+
+private:
+ TString MakeKey(const TFunction* f) const {
+ auto node = NYT::TNode()
+ ("Name", NYT::TNode(f->Name));
+ if (f->TypeConfig) {
+ node("TypeConfig", NYT::TNode(f->TypeConfig));
+ }
+
+ if (f->UserType) {
+ node("UserType", TypeToYsonNode(f->UserType));
+ }
+
+ return MakeHash(NYT::NodeToCanonicalYsonString(node, NYT::NYson::EYsonFormat::Binary));
+ }
+
+ TString SaveValue(const TFunction* f) const {
+ auto node = NYT::TNode()
+ ("CallableType", TypeToYsonNode(f->CallableType));
+ if (f->NormalizedUserType && f->NormalizedUserType->GetKind() != ETypeAnnotationKind::Void) {
+ node("NormalizedUserType", TypeToYsonNode(f->NormalizedUserType));
+ }
+
+ if (f->RunConfigType && f->RunConfigType->GetKind() != ETypeAnnotationKind::Void) {
+ node("RunConfigType", TypeToYsonNode(f->RunConfigType));
+ }
+
+ if (f->SupportsBlocks) {
+ node("SupportsBlocks", NYT::TNode(true));
+ }
+
+ if (f->IsStrict) {
+ node("IsStrict", NYT::TNode(true));
+ }
+
+ return NYT::NodeToYsonString(node,NYT::NYson::EYsonFormat::Binary);
+ }
+
+ void LoadValue(TFunction* f, const TString& value, TExprContext& ctx) const {
+ auto node = NYT::NodeFromYsonString(value);
+ f->CallableType = ParseTypeFromYson(node["CallableType"], ctx);
+ if (node.HasKey("NormalizedUserType")) {
+ f->NormalizedUserType = ParseTypeFromYson(node["NormalizedUserType"], ctx);
+ }
+
+ if (node.HasKey("RunConfigType")) {
+ f->RunConfigType = ParseTypeFromYson(node["RunConfigType"], ctx);
+ }
+
+ if (node.HasKey("SupportsBlocks")) {
+ f->SupportsBlocks = node["SupportsBlocks"].AsBool();
+ }
+
+ if (node.HasKey("IsStrict")) {
+ f->IsStrict = node["IsStrict"].AsBool();
+ }
+ }
+
+private:
+ const IUdfResolver::TPtr Inner_;
+ const TQContext QContext_;
+};
+
+}
+
+IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext) {
+ return new TResolver(inner, qContext);
+}
+
+}
diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h
new file mode 100644
index 00000000000..17790f2510d
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h
@@ -0,0 +1,9 @@
+#pragma once
+#include <ydb/library/yql/core/yql_udf_resolver.h>
+#include <ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h>
+
+namespace NYql::NCommon {
+
+IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext);
+
+}
diff --git a/ydb/library/yql/core/qplayer/ya.make b/ydb/library/yql/core/qplayer/ya.make
index 25965070b63..d8708620eff 100644
--- a/ydb/library/yql/core/qplayer/ya.make
+++ b/ydb/library/yql/core/qplayer/ya.make
@@ -1,4 +1,5 @@
RECURSE(
storage
+ udf_resolver
)
diff --git a/ydb/library/yql/core/ut/ya.make b/ydb/library/yql/core/ut/ya.make
index b786e7ae08e..39813be01eb 100644
--- a/ydb/library/yql/core/ut/ya.make
+++ b/ydb/library/yql/core/ut/ya.make
@@ -21,6 +21,7 @@ PEERDIR(
ydb/library/yql/core/facade
ydb/library/yql/core/services
ydb/library/yql/core/qplayer/storage/memory
+ ydb/library/yql/providers/common/udf_resolve
ydb/library/yql/public/udf
ydb/library/yql/public/udf/service/exception_policy
ydb/library/yql/core/type_ann
@@ -35,6 +36,7 @@ PEERDIR(
ydb/library/yql/minikql/comp_nodes/llvm14
ydb/library/yql/minikql/invoke_builtins/llvm14
ydb/library/yql/sql/pg
+ ydb/library/yql/udfs/common/string
)
IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
diff --git a/ydb/library/yql/core/ut/yql_qplayer_ut.cpp b/ydb/library/yql/core/ut/yql_qplayer_ut.cpp
index 9dfe1ad86f7..c618f662685 100644
--- a/ydb/library/yql/core/ut/yql_qplayer_ut.cpp
+++ b/ydb/library/yql/core/ut/yql_qplayer_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
#include <ydb/library/yql/core/facade/yql_facade.h>
#include <ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -11,14 +12,21 @@
using namespace NYql;
-bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql) {
+bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql, bool withUdfs) {
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry());
+ if (withUdfs) {
+ auto cloned = functionRegistry->Clone();
+ NKikimr::NMiniKQL::FillStaticModules(*cloned);
+ functionRegistry = cloned;
+ }
+
auto yqlNativeServices = NFile::TYtFileServices::Make(functionRegistry.Get(), {}, {}, "");
auto ytGateway = CreateYtFileGateway(yqlNativeServices);
TVector<TDataProviderInitializer> dataProvidersInit;
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway));
TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut");
+ factory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(functionRegistry.Get()));
TProgramPtr program = factory.Create("-stdin-", sourceCode);
program->SetQContext(qContext);
@@ -48,10 +56,10 @@ bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql
void CheckProgram(const TString& sql, bool isSql = true) {
auto qStorage = MakeMemoryQStorage();
TQContext savingCtx(qStorage->MakeWriter("foo"));
- UNIT_ASSERT(RunProgram(sql, savingCtx, isSql));
+ UNIT_ASSERT(RunProgram(sql, savingCtx, isSql, true));
savingCtx.GetWriter()->Commit().GetValueSync();
TQContext loadingCtx(qStorage->MakeReader("foo"));
- UNIT_ASSERT(RunProgram("", loadingCtx, isSql));
+ UNIT_ASSERT(RunProgram("", loadingCtx, isSql, false));
}
Y_UNIT_TEST_SUITE(QPlayerTests) {
@@ -97,4 +105,9 @@ Y_UNIT_TEST_SUITE(QPlayerTests) {
auto s = "select 1";
CheckProgram(s);
}
+
+ Y_UNIT_TEST(Udf) {
+ auto s = "select String::AsciiToUpper('a')";
+ CheckProgram(s);
+ }
}
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
index 9f5133e6b0e..ed1f9f8a43e 100644
--- a/ydb/library/yql/tools/dqrun/dqrun.cpp
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -639,6 +639,14 @@ int RunMain(int argc, const char* argv[])
return 1;
}
+ if (res.Has("replay")) {
+ qStorage = MakeFileQStorage(qStorageDir);
+ qContext = TQContext(qStorage->MakeReader(opId));
+ } else if (res.Has("capture")) {
+ qStorage = MakeFileQStorage(qStorageDir);
+ qContext = TQContext(qStorage->MakeWriter(opId));
+ }
+
if (res.Has("dq-host")) {
dqHost = res.Get<TString>("dq-host");
}
@@ -937,12 +945,15 @@ int RunMain(int argc, const char* argv[])
TProgramFactory progFactory(emulateYt, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "dqrun");
progFactory.AddUserDataTable(std::move(dataTable));
progFactory.SetModules(moduleResolver);
+ IUdfResolver::TPtr udfResolverImpl;
if (udfResolver) {
- progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
- udfResolver, {}, {}, udfResolverFilterSyscalls, {}));
+ udfResolverImpl = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
+ udfResolver, {}, {}, udfResolverFilterSyscalls, {});
} else {
- progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true));
+ udfResolverImpl = NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true);
}
+
+ progFactory.SetUdfResolver(udfResolverImpl);
progFactory.SetFileStorage(storage);
progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig));
progFactory.SetGatewaysConfig(&gatewaysConfig);
@@ -985,14 +996,7 @@ int RunMain(int argc, const char* argv[])
program->SetQueryName(progFile);
}
- if (res.Has("replay")) {
- qStorage = MakeFileQStorage(qStorageDir);
- qContext = TQContext(qStorage->MakeReader(opId));
- program->SetQContext(qContext);
- } else if (res.Has("capture")) {
- Y_ENSURE(opId);
- qStorage = MakeFileQStorage(qStorageDir);
- qContext = TQContext(qStorage->MakeWriter(opId));
+ if (qStorage) {
program->SetQContext(qContext);
}