diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-03-01 12:06:51 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-03-01 12:06:51 +0300 |
commit | c16c4f061fb29203701994213c06150d0b3c5b27 (patch) | |
tree | d8bf14e54abe846f59079464329e36068aa7b484 | |
parent | 650df46f0db19d6b44ba44db848126db67c2aa90 (diff) | |
download | ydb-c16c4f061fb29203701994213c06150d0b3c5b27.tar.gz |
[yql] Move url preprocessing out of file storage
-rw-r--r-- | ydb/library/yql/core/facade/yql_facade.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/core/facade/yql_facade.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/core/file_storage/file_storage.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/core/file_storage/file_storage.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/core/file_storage/file_storage_decorator.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/core/file_storage/file_storage_decorator.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_type_annotation.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_user_data.h | 11 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_user_data_storage.cpp | 32 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_user_data_storage.h | 6 |
10 files changed, 62 insertions, 19 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index d8e58e61ab..078d67c652 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -178,7 +178,11 @@ void TProgramFactory::SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet: } void TProgramFactory::SetFileStorage(TFileStoragePtr fileStorage) { - FileStorage_ = fileStorage; + FileStorage_ = std::move(fileStorage); +} + +void TProgramFactory::SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing) { + UrlPreprocessing_ = std::move(urlPreprocessing); } void TProgramFactory::SetArrowResolver(IArrowResolver::TPtr arrowResolver) { @@ -211,7 +215,7 @@ TProgramPtr TProgramFactory::Create( // make UserDataTable_ copy here return new TProgram(FunctionRegistry_, randomProvider, timeProvider, NextUniqueId_, DataProvidersInit_, - UserDataTable_, Credentials_, moduleResolver, udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, + UserDataTable_, Credentials_, moduleResolver, udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, UrlPreprocessing_, GatewaysConfig_, filename, sourceCode, sessionId, Runner_, EnableRangeComputeFor_, ArrowResolver_, hiddenMode); } @@ -231,6 +235,7 @@ TProgram::TProgram( const TUdfIndex::TPtr& udfIndex, const TUdfIndexPackageSet::TPtr& udfIndexPackageSet, const TFileStoragePtr& fileStorage, + const IUrlPreprocessing::TPtr& urlPreprocessing, const TGatewaysConfig* gatewaysConfig, const TString& filename, const TString& sourceCode, @@ -276,15 +281,12 @@ TProgram::TProgram( modules->SetCredentials(Credentials_); } OperationOptions_.Runner = runner; + UserDataStorage_->SetUrlPreprocessor(urlPreprocessing); } TProgram::~TProgram() { try { CloseLastSession(); - // Token resolver may keep some references to provider internal's. So reset it to release provider's data - if (FileStorage_) { - FileStorage_->SetTokenResolver({}); - } // stop all non complete execution before deleting TExprCtx DataProviders_.clear(); } catch (...) { @@ -1406,9 +1408,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us } tokenResolvers.push_back(BuildDefaultTokenResolver(typeAnnotationContext->Credentials)); - if (FileStorage_) { - FileStorage_->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers))); - } + typeAnnotationContext->UserDataStorage->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers))); return typeAnnotationContext; } diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index 1ab1deeed2..aead549e20 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -7,6 +7,7 @@ #include <ydb/library/yql/providers/config/yql_config_provider.h> #include <ydb/library/yql/providers/result/provider/yql_result_provider.h> #include <ydb/library/yql/core/yql_type_annotation.h> +#include <ydb/library/yql/core/yql_user_data.h> #include <ydb/library/yql/sql/sql.h> #include <library/cpp/random_provider/random_provider.h> @@ -52,6 +53,7 @@ public: void SetUdfResolver(IUdfResolver::TPtr udfResolver); void SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet::TPtr udfIndexPackageSet); void SetFileStorage(TFileStoragePtr fileStorage); + void SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing); void EnableRangeComputeFor(); void SetArrowResolver(IArrowResolver::TPtr arrowResolver); @@ -80,6 +82,7 @@ private: TUdfIndex::TPtr UdfIndex_; TUdfIndexPackageSet::TPtr UdfIndexPackageSet_; TFileStoragePtr FileStorage_; + IUrlPreprocessing::TPtr UrlPreprocessing_; TString Runner_; bool EnableRangeComputeFor_ = false; IArrowResolver::TPtr ArrowResolver_; @@ -313,6 +316,7 @@ private: const TUdfIndex::TPtr& udfIndex, const TUdfIndexPackageSet::TPtr& udfIndexPackageSet, const TFileStoragePtr& fileStorage, + const IUrlPreprocessing::TPtr& urlPreprocessing, const TGatewaysConfig* gatewaysConfig, const TString& filename, const TString& sourceCode, diff --git a/ydb/library/yql/core/file_storage/file_storage.cpp b/ydb/library/yql/core/file_storage/file_storage.cpp index 963c32aca7..5fbb183566 100644 --- a/ydb/library/yql/core/file_storage/file_storage.cpp +++ b/ydb/library/yql/core/file_storage/file_storage.cpp @@ -181,9 +181,6 @@ public: return Config; } - void SetTokenResolver(std::function<TString(const TString&, const TString&)> /*tokenResolver*/) final { - } - private: TFileLinkPtr PutUrl(const THttpURL& url, const TString& token, const NFS::IDownloaderPtr& downloader) { return WithRetry<TDownloadError>(Config.GetRetryCount(), [&, this]() { diff --git a/ydb/library/yql/core/file_storage/file_storage.h b/ydb/library/yql/core/file_storage/file_storage.h index 624badc8e1..b95ccabaaf 100644 --- a/ydb/library/yql/core/file_storage/file_storage.h +++ b/ydb/library/yql/core/file_storage/file_storage.h @@ -12,7 +12,6 @@ #include <util/generic/string.h> #include <vector> -#include <functional> namespace NYql { @@ -32,7 +31,6 @@ struct IFileStorage: public TThrRefBase { virtual TFsPath GetRoot() const = 0; virtual TFsPath GetTemp() const = 0; virtual const TFileStorageConfig& GetConfig() const = 0; - virtual void SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) = 0; }; using TFileStoragePtr = TIntrusivePtr<IFileStorage>; diff --git a/ydb/library/yql/core/file_storage/file_storage_decorator.cpp b/ydb/library/yql/core/file_storage/file_storage_decorator.cpp index f2b8e8db29..ec9e5bfac8 100644 --- a/ydb/library/yql/core/file_storage/file_storage_decorator.cpp +++ b/ydb/library/yql/core/file_storage/file_storage_decorator.cpp @@ -38,8 +38,5 @@ TFsPath TFileStorageDecorator::GetTemp() const { const TFileStorageConfig& TFileStorageDecorator::GetConfig() const { return Inner_->GetConfig(); } -void TFileStorageDecorator::SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) { - Inner_->SetTokenResolver(std::move(tokenResolver)); -} } // NYql diff --git a/ydb/library/yql/core/file_storage/file_storage_decorator.h b/ydb/library/yql/core/file_storage/file_storage_decorator.h index dcb8d323eb..bed4e1f67a 100644 --- a/ydb/library/yql/core/file_storage/file_storage_decorator.h +++ b/ydb/library/yql/core/file_storage/file_storage_decorator.h @@ -19,7 +19,6 @@ public: TFsPath GetRoot() const override; TFsPath GetTemp() const override; const TFileStorageConfig& GetConfig() const override; - void SetTokenResolver(std::function<TString(const TString&, const TString&)> tokenResolver) override; protected: TFileStoragePtr Inner_; diff --git a/ydb/library/yql/core/yql_type_annotation.cpp b/ydb/library/yql/core/yql_type_annotation.cpp index be8c8f2686..f9f4cbc660 100644 --- a/ydb/library/yql/core/yql_type_annotation.cpp +++ b/ydb/library/yql/core/yql_type_annotation.cpp @@ -38,6 +38,7 @@ bool TTypeAnnotationContext::DoInitialize(TExprContext& ctx) { } Y_ENSURE(UserDataStorage); + UserDataStorage->FillUserDataUrls(); // Disable "in progress" constraints DisableConstraintCheck.emplace(TUniqueConstraintNode::Name()); diff --git a/ydb/library/yql/core/yql_user_data.h b/ydb/library/yql/core/yql_user_data.h index f06872a9ab..97877b2990 100644 --- a/ydb/library/yql/core/yql_user_data.h +++ b/ydb/library/yql/core/yql_user_data.h @@ -7,6 +7,8 @@ #include <util/generic/hash.h> #include <util/generic/hash_set.h> +#include <functional> + namespace NYql { // -- user files -- @@ -102,4 +104,13 @@ inline IOutputStream& operator<<(IOutputStream& os, const TUserDataKey& key) { using TUserDataTable = THashMap<TUserDataKey, TUserDataBlock, TUserDataKey::THash, TUserDataKey::TEqualTo>; +using TTokenResolver = std::function<TString(const TString&, const TString&)>; + +struct IUrlPreprocessing: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IUrlPreprocessing>; + // Returns pair of <new url>, <url alias> + virtual std::pair<TString, TString> Preprocess(const TString& url) = 0; +}; + } // namespace NYql diff --git a/ydb/library/yql/core/yql_user_data_storage.cpp b/ydb/library/yql/core/yql_user_data_storage.cpp index 90099d082c..1e98287876 100644 --- a/ydb/library/yql/core/yql_user_data_storage.cpp +++ b/ydb/library/yql/core/yql_user_data_storage.cpp @@ -45,15 +45,25 @@ TUserDataStorage::TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable d { } +void TUserDataStorage::SetTokenResolver(TTokenResolver tokenResolver) { + TokenResolver_ = std::move(tokenResolver); +} + +void TUserDataStorage::SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing) { + UrlPreprocessing_ = std::move(urlPreprocessing); +} + void TUserDataStorage::AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block) { const auto key = ComposeUserDataKey(name); AddUserDataBlock(key, block); } void TUserDataStorage::AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block) { - if (!UserData_.emplace(key, block).second) { + auto res = UserData_.emplace(key, block); + if (!res.second) { throw yexception() << "Failed to add user data block, key " << key << " already registered"; } + TryFillUserDataUrl(res.first->second); } bool TUserDataStorage::ContainsUserDataBlock(const TStringBuf& name) const { @@ -152,6 +162,26 @@ TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> TUserDataStorage::FindUser return res; } +void TUserDataStorage::FillUserDataUrls() { + for (auto& p : UserData_) { + TryFillUserDataUrl(p.second); + } +} + +void TUserDataStorage::TryFillUserDataUrl(TUserDataBlock& block) const { + if (block.Type != EUserDataType::URL) { + return; + } + + TString alias; + if (UrlPreprocessing_) { + std::tie(block.Data, alias) = UrlPreprocessing_->Preprocess(block.Data); + } + if (!block.UrlToken && TokenResolver_) { + block.UrlToken = TokenResolver_(block.Data, alias); + } +} + std::map<TString, const TUserDataBlock*> TUserDataStorage::GetDirectoryContent(const TStringBuf& path, ui32 maxFileCount) const { const auto fullPath = MakeFolderName(path); diff --git a/ydb/library/yql/core/yql_user_data_storage.h b/ydb/library/yql/core/yql_user_data_storage.h index bd98c6d1ac..aed668e4c1 100644 --- a/ydb/library/yql/core/yql_user_data_storage.h +++ b/ydb/library/yql/core/yql_user_data_storage.h @@ -17,6 +17,8 @@ public: public: TUserDataStorage(TFileStoragePtr fileStorage, TUserDataTable data, IUdfResolver::TPtr udfResolver, TUdfIndex::TPtr udfIndex); + void SetTokenResolver(TTokenResolver tokenResolver); + void SetUrlPreprocessor(IUrlPreprocessing::TPtr urlPreprocessing); void AddUserDataBlock(const TStringBuf& name, const TUserDataBlock& block); void AddUserDataBlock(const TUserDataKey& key, const TUserDataBlock& block); @@ -38,6 +40,7 @@ public: TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TStringBuf& name, ui32 maxFileCount = ~0u) const; static TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount = ~0u); + void FillUserDataUrls(); std::map<TString, const TUserDataBlock*> GetDirectoryContent(const TStringBuf& path, ui32 maxFileCount = ~0u) const; static TString MakeFullName(const TStringBuf& name); static TString MakeFolderName(const TStringBuf& name); @@ -62,6 +65,7 @@ public: NThreading::TFuture<std::function<TUserDataBlock()>> FreezeAsync(const TUserDataKey& key); private: + void TryFillUserDataUrl(TUserDataBlock& block) const; TUserDataBlock& RegisterLink(const TUserDataKey& key, TFileLinkPtr link); private: @@ -69,6 +73,8 @@ private: TUserDataTable UserData_; IUdfResolver::TPtr UdfResolver; TUdfIndex::TPtr UdfIndex; + TTokenResolver TokenResolver_; + IUrlPreprocessing::TPtr UrlPreprocessing_; THashSet<TUserDataKey, TUserDataKey::THash, TUserDataKey::TEqualTo> ScannedUdfs; std::function<void(const TUserDataBlock& block)> ScanUdfStrategy_; |