aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-03-01 12:06:51 +0300
committerudovichenko-r <rvu@ydb.tech>2023-03-01 12:06:51 +0300
commitc16c4f061fb29203701994213c06150d0b3c5b27 (patch)
treed8bf14e54abe846f59079464329e36068aa7b484
parent650df46f0db19d6b44ba44db848126db67c2aa90 (diff)
downloadydb-c16c4f061fb29203701994213c06150d0b3c5b27.tar.gz
[yql] Move url preprocessing out of file storage
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp18
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h4
-rw-r--r--ydb/library/yql/core/file_storage/file_storage.cpp3
-rw-r--r--ydb/library/yql/core/file_storage/file_storage.h2
-rw-r--r--ydb/library/yql/core/file_storage/file_storage_decorator.cpp3
-rw-r--r--ydb/library/yql/core/file_storage/file_storage_decorator.h1
-rw-r--r--ydb/library/yql/core/yql_type_annotation.cpp1
-rw-r--r--ydb/library/yql/core/yql_user_data.h11
-rw-r--r--ydb/library/yql/core/yql_user_data_storage.cpp32
-rw-r--r--ydb/library/yql/core/yql_user_data_storage.h6
10 files changed, 62 insertions, 19 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index d8e58e61ab..078d67c652 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -178,7 +178,11 @@ void TProgramFactory::SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet:
}
void TProgramFactory::SetFileStorage(TFileStoragePtr fileStorage) {
- FileStorage_ = fileStorage;
+ FileStorage_ = std::move(fileStorage);
+}
+
+void TProgramFactory::SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing) {
+ UrlPreprocessing_ = std::move(urlPreprocessing);
}
void TProgramFactory::SetArrowResolver(IArrowResolver::TPtr arrowResolver) {
@@ -211,7 +215,7 @@ TProgramPtr TProgramFactory::Create(
// make UserDataTable_ copy here
return new TProgram(FunctionRegistry_, randomProvider, timeProvider, NextUniqueId_, DataProvidersInit_,
- UserDataTable_, Credentials_, moduleResolver, udfResolver, udfIndex, udfIndexPackageSet, FileStorage_,
+ UserDataTable_, Credentials_, moduleResolver, udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, UrlPreprocessing_,
GatewaysConfig_, filename, sourceCode, sessionId, Runner_, EnableRangeComputeFor_, ArrowResolver_, hiddenMode);
}
@@ -231,6 +235,7 @@ TProgram::TProgram(
const TUdfIndex::TPtr& udfIndex,
const TUdfIndexPackageSet::TPtr& udfIndexPackageSet,
const TFileStoragePtr& fileStorage,
+ const IUrlPreprocessing::TPtr& urlPreprocessing,
const TGatewaysConfig* gatewaysConfig,
const TString& filename,
const TString& sourceCode,
@@ -276,15 +281,12 @@ TProgram::TProgram(
modules->SetCredentials(Credentials_);
}
OperationOptions_.Runner = runner;
+ UserDataStorage_->SetUrlPreprocessor(urlPreprocessing);
}
TProgram::~TProgram() {
try {
CloseLastSession();
- // Token resolver may keep some references to provider internal's. So reset it to release provider's data
- if (FileStorage_) {
- FileStorage_->SetTokenResolver({});
- }
// stop all non complete execution before deleting TExprCtx
DataProviders_.clear();
} catch (...) {
@@ -1406,9 +1408,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
}
tokenResolvers.push_back(BuildDefaultTokenResolver(typeAnnotationContext->Credentials));
- if (FileStorage_) {
- FileStorage_->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers)));
- }
+ typeAnnotationContext->UserDataStorage->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers)));
return typeAnnotationContext;
}
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index 1ab1deeed2..aead549e20 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -7,6 +7,7 @@
#include <ydb/library/yql/providers/config/yql_config_provider.h>
#include <ydb/library/yql/providers/result/provider/yql_result_provider.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
+#include <ydb/library/yql/core/yql_user_data.h>
#include <ydb/library/yql/sql/sql.h>
#include <library/cpp/random_provider/random_provider.h>
@@ -52,6 +53,7 @@ public:
void SetUdfResolver(IUdfResolver::TPtr udfResolver);
void SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet::TPtr udfIndexPackageSet);
void SetFileStorage(TFileStoragePtr fileStorage);
+ void SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing);
void EnableRangeComputeFor();
void SetArrowResolver(IArrowResolver::TPtr arrowResolver);
@@ -80,6 +82,7 @@ private:
TUdfIndex::TPtr UdfIndex_;
TUdfIndexPackageSet::TPtr UdfIndexPackageSet_;
TFileStoragePtr FileStorage_;
+ IUrlPreprocessing::TPtr UrlPreprocessing_;
TString Runner_;
bool EnableRangeComputeFor_ = false;
IArrowResolver::TPtr ArrowResolver_;
@@ -313,6 +316,7 @@ private:
const TUdfIndex::TPtr& udfIndex,
const TUdfIndexPackageSet::TPtr& udfIndexPackageSet,
const TFileStoragePtr& fileStorage,
+ const IUrlPreprocessing::TPtr& urlPreprocessing,
const TGatewaysConfig* gatewaysConfig,
const TString& filename,
const TString& sourceCode,
diff --git a/ydb/library/yql/core/file_storage/file_storage.cpp b/ydb/library/yql/core/file_storage/file_storage.cpp
index 963c32aca7..5fbb183566 100644
--- a/ydb/library/yql/core/file_storage/file_storage.cpp
+++ b/ydb/library/yql/core/file_storage/file_storage.cpp
@@ -181,9 +181,6 @@ public:
return Config;
}
- void SetTokenResolver(std::function<TString(const TString&, const TString&)> /*tokenResolver*/) final {
- }
-
private:
TFileLinkPtr PutUrl(const THttpURL& url, const TString& token, const NFS::IDownloaderPtr& downloader) {
return WithRetry<TDownloadError>(Config.GetRetryCount(), [&, this]() {
diff --git a/ydb/library/yql/core/file_storage/file_storage.h b/ydb/library/yql/core/file_storage/file_storage.h
index 624badc8e1..b95ccabaaf 100644
--- a/ydb/library/yql/core/file_storage/file_storage.h
+++ b/ydb/library/yql/core/file_storage/file_storage.h
@@ -12,7 +12,6 @@
#include <util/generic/string.h>
#include <vector>
-#include <functional>
namespace NYql {
@@ -32,7 +31,6 @@ struct IFileStorage: public TThrRefBase {
virtual TFsPath GetRoot() const = 0;
virtual TFsPath GetTemp() const = 0;
virtual const TFileStorageConfig& GetConfig() const = 0;
- virtual void SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) = 0;
};
using TFileStoragePtr = TIntrusivePtr<IFileStorage>;
diff --git a/ydb/library/yql/core/file_storage/file_storage_decorator.cpp b/ydb/library/yql/core/file_storage/file_storage_decorator.cpp
index f2b8e8db29..ec9e5bfac8 100644
--- a/ydb/library/yql/core/file_storage/file_storage_decorator.cpp
+++ b/ydb/library/yql/core/file_storage/file_storage_decorator.cpp
@@ -38,8 +38,5 @@ TFsPath TFileStorageDecorator::GetTemp() const {
const TFileStorageConfig& TFileStorageDecorator::GetConfig() const {
return Inner_->GetConfig();
}
-void TFileStorageDecorator::SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) {
- Inner_->SetTokenResolver(std::move(tokenResolver));
-}
} // NYql
diff --git a/ydb/library/yql/core/file_storage/file_storage_decorator.h b/ydb/library/yql/core/file_storage/file_storage_decorator.h
index dcb8d323eb..bed4e1f67a 100644
--- a/ydb/library/yql/core/file_storage/file_storage_decorator.h
+++ b/ydb/library/yql/core/file_storage/file_storage_decorator.h
@@ -19,7 +19,6 @@ public:
TFsPath GetRoot() const override;
TFsPath GetTemp() const override;
const TFileStorageConfig& GetConfig() const override;
- void SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) override;
protected:
TFileStoragePtr Inner_;
diff --git a/ydb/library/yql/core/yql_type_annotation.cpp b/ydb/library/yql/core/yql_type_annotation.cpp
index be8c8f2686..f9f4cbc660 100644
--- a/ydb/library/yql/core/yql_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_type_annotation.cpp
@@ -38,6 +38,7 @@ bool TTypeAnnotationContext::DoInitialize(TExprContext& ctx) {
}
Y_ENSURE(UserDataStorage);
+ UserDataStorage->FillUserDataUrls();
// Disable "in progress" constraints
DisableConstraintCheck.emplace(TUniqueConstraintNode::Name());
diff --git a/ydb/library/yql/core/yql_user_data.h b/ydb/library/yql/core/yql_user_data.h
index f06872a9ab..97877b2990 100644
--- a/ydb/library/yql/core/yql_user_data.h
+++ b/ydb/library/yql/core/yql_user_data.h
@@ -7,6 +7,8 @@
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
+#include <functional>
+
namespace NYql {
// -- user files --
@@ -102,4 +104,13 @@ inline IOutputStream& operator<<(IOutputStream& os, const TUserDataKey& key) {
using TUserDataTable = THashMap<TUserDataKey, TUserDataBlock, TUserDataKey::THash, TUserDataKey::TEqualTo>;
+using TTokenResolver = std::function<TString(const TString&, const TString&)>;
+
+struct IUrlPreprocessing: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IUrlPreprocessing>;
+ // Returns pair of <new url>, <url alias>
+ virtual std::pair<TString, TString> Preprocess(const TString& url) = 0;
+};
+
} // namespace NYql
diff --git a/ydb/library/yql/core/yql_user_data_storage.cpp b/ydb/library/yql/core/yql_user_data_storage.cpp
index 90099d082c..1e98287876 100644
--- a/ydb/library/yql/core/yql_user_data_storage.cpp
+++ b/ydb/library/yql/core/yql_user_data_storage.cpp
@@ -45,15 +45,25 @@ TUserDataStorage::TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable d
{
}
+void TUserDataStorage::SetTokenResolver(TTokenResolver tokenResolver) {
+ TokenResolver_ = std::move(tokenResolver);
+}
+
+void TUserDataStorage::SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing) {
+ UrlPreprocessing_ = std::move(urlPreprocessing);
+}
+
void TUserDataStorage::AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block) {
const auto key = ComposeUserDataKey(name);
AddUserDataBlock(key, block);
}
void TUserDataStorage::AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block) {
- if (!UserData_.emplace(key, block).second) {
+ auto res = UserData_.emplace(key, block);
+ if (!res.second) {
throw yexception() << "Failed to add user data block, key " << key << " already registered";
}
+ TryFillUserDataUrl(res.first->second);
}
bool TUserDataStorage::ContainsUserDataBlock(const TStringBuf& name) const {
@@ -152,6 +162,26 @@ TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> TUserDataStorage::FindUser
return res;
}
+void TUserDataStorage::FillUserDataUrls() {
+ for (auto& p : UserData_) {
+ TryFillUserDataUrl(p.second);
+ }
+}
+
+void TUserDataStorage::TryFillUserDataUrl(TUserDataBlock& block) const {
+ if (block.Type != EUserDataType::URL) {
+ return;
+ }
+
+ TString alias;
+ if (UrlPreprocessing_) {
+ std::tie(block.Data, alias) = UrlPreprocessing_->Preprocess(block.Data);
+ }
+ if (!block.UrlToken && TokenResolver_) {
+ block.UrlToken = TokenResolver_(block.Data, alias);
+ }
+}
+
std::map<TString, const TUserDataBlock*> TUserDataStorage::GetDirectoryContent(const TStringBuf& path, ui32 maxFileCount) const {
const auto fullPath = MakeFolderName(path);
diff --git a/ydb/library/yql/core/yql_user_data_storage.h b/ydb/library/yql/core/yql_user_data_storage.h
index bd98c6d1ac..aed668e4c1 100644
--- a/ydb/library/yql/core/yql_user_data_storage.h
+++ b/ydb/library/yql/core/yql_user_data_storage.h
@@ -17,6 +17,8 @@ public:
public:
TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex);
+ void SetTokenResolver(TTokenResolver tokenResolver);
+ void SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing);
void AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block);
void AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block);
@@ -38,6 +40,7 @@ public:
TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TStringBuf& name, ui32 maxFileCount = ~0u) const;
static TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount = ~0u);
+ void FillUserDataUrls();
std::map<TString, const TUserDataBlock*> GetDirectoryContent(const TStringBuf& path, ui32 maxFileCount = ~0u) const;
static TString MakeFullName(const TStringBuf& name);
static TString MakeFolderName(const TStringBuf& name);
@@ -62,6 +65,7 @@ public:
NThreading::TFuture<std::function<TUserDataBlock()>> FreezeAsync(const TUserDataKey& key);
private:
+ void TryFillUserDataUrl(TUserDataBlock& block) const;
TUserDataBlock& RegisterLink(const TUserDataKey& key, TFileLinkPtr link);
private:
@@ -69,6 +73,8 @@ private:
TUserDataTable UserData_;
IUdfResolver::TPtr UdfResolver;
TUdfIndex::TPtr UdfIndex;
+ TTokenResolver TokenResolver_;
+ IUrlPreprocessing::TPtr UrlPreprocessing_;
THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs;
std::function<void(const TUserDataBlock& block)> ScanUdfStrategy_;