diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-02-09 21:23:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 0830448769cbfaf4d68b901433d786d0be99fe00 (patch) | |
tree | 40920c6fcae0c265b916a86ef84e66b2f5c5b503 | |
parent | b78c7143329217632f2223ff8b3d36d0cdeadd41 (diff) | |
download | ydb-0830448769cbfaf4d68b901433d786d0be99fe00.tar.gz |
[yql] Extract downloaders from file storage
YQL-14297
ref:ae5dae36c3bad823f361c50432d5d7789f5b9352
17 files changed, 377 insertions, 585 deletions
diff --git a/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp b/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp deleted file mode 100644 index 913d6dbd52..0000000000 --- a/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp +++ /dev/null @@ -1,179 +0,0 @@ -#include <ydb/library/yql/core/file_storage/file_exporter.h> -#include <ydb/library/yql/core/file_storage/download_stream.h> -#include <ydb/library/yql/core/file_storage/storage.h> -#include <ydb/library/yql/utils/fetch/fetch.h> -#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> -#include <ydb/library/yql/utils/multi_resource_lock.h> - -#include <library/cpp/digest/md5/md5.h> -#include <library/cpp/cgiparam/cgiparam.h> - -#include <grpc++/grpc++.h> - -#include <arc/api/public/repo.grpc.pb.h> - -#include <util/system/shellcommand.h> -#include <util/stream/file.h> -#include <util/system/fs.h> -#include <util/string/strip.h> -#include <util/folder/dirut.h> - -namespace NYql { - -class TClient { -public: - TClient( - std::shared_ptr<grpc::Channel> channel) - : Stub(NVcs::NPublic::FileService::NewStub(channel)) - { - } - - TString ReadFile(const NVcs::NPublic::ReadFileRequest& request, const TString& path) { - grpc::ClientContext context; - auto reader = Stub->ReadFile(&context, request); - NVcs::NPublic::ReadFileResponse response; - TString ret; - for (;;) { - if (!reader->Read(&response)) { - grpc::Status status = reader->Finish(); - if (!status.ok()) { - ythrow yexception() << "Failed to download url: " << path << ", status: " << status.error_message(); - } - - break; - } - - if (response.HasData()) { - ret += response.GetData(); - } - } - - return ret; - } -private: - std::unique_ptr<NVcs::NPublic::FileService::Stub> Stub; -}; - -class TFileExporter : public IFileExporter { -public: - std::pair<ui64, TString> ExportToFile(const TFileStorageConfig& config, const TString& convertedUrl, const THttpURL& url, const TString& dstFile) override { - Y_UNUSED(convertedUrl); - TCgiParameters params(url.GetField(NUri::TField::FieldQuery)); - auto hash = params.Get("hash"); - if (hash) { - if (hash.length() != 40) { - throw yexception() << "Only commit hash is expected, but got: " << hash; - } - - if (!config.HasArcTokenPath()) { - throw yexception() << "Missing Arc token"; - } - - auto path = config.GetArcTokenPath(); - if (path.StartsWith("~")) { - path = GetHomeDir() + path.substr(1, path.Size() - 1); - } - - TString authToken = Strip(TFileInput(path).ReadAll()); - - std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::CompositeChannelCredentials( - grpc::SslCredentials(grpc::SslCredentialsOptions()), - grpc::AccessTokenCredentials(authToken)); - TClient client(grpc::CreateChannel("api.arc-vcs.yandex-team.ru:6734", credentials)); - - TString remoteFilePath = TStringBuilder() << url.GetField(NUri::TField::FieldHost) << url.GetField(NUri::TField::FieldPath); - NVcs::NPublic::ReadFileRequest request; - request.SetPath(remoteFilePath); - request.SetRevision(hash); - const auto content = client.ReadFile(request, remoteFilePath); - TFileOutput dst(dstFile); - dst.Write(content.Data(), content.Size()); - dst.Finish(); - - i64 dstFileLen = GetFileLength(dstFile.c_str()); - if (dstFileLen == -1) { - ythrow TSystemError() << "cannot get file length: " << dstFile; - } - - YQL_ENSURE(static_cast<ui64>(dstFileLen) == content.Size()); - } - else { - auto branch = params.Get("branch"); - // support both operative and peg revision (see http://svnbook.red-bean.com/en/1.6/svn.advanced.pegrevs.html ) - // peg rev is enough usually - auto pegRevStr = params.Get("rev"); - unsigned int pegRev = 0; - if (!TryFromString(pegRevStr, pegRev) || pegRev == 0) { - throw yexception() << "Revision for Arcadia file must be specified"; - } - - auto opRev = params.Get("op_rev"); - if (!branch) { - branch = "trunk"; - } - - // in order to support @ in file names we have to add @ and optional peg revision - // see https://stackoverflow.com/questions/757435/how-to-escape-characters-in-subversion-managed-file-names - TStringBuilder fullUrlBuilder = TStringBuilder() << "svn+ssh://arcadia-ro.yandex.ru/arc/" << branch << "/arcadia/" << - url.GetField(NUri::TField::FieldHost) << url.GetField(NUri::TField::FieldPath) << "@" << pegRev; - - TStringBuilder configOptionBuilder = TStringBuilder() << "config:tunnels:ssh=ssh -F /dev/null -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=30 -o ForwardAgent=true"; - - if (config.GetArcSshPkPath()) { - configOptionBuilder << " -i " << config.GetArcSshPkPath(); - } - - if (config.GetArcSshUser()) { - configOptionBuilder << " -l " << config.GetArcSshUser(); - } - - TList<TString> args = { "export", fullUrlBuilder, dstFile, "--non-interactive", "--depth=empty", "--config-option", configOptionBuilder }; - if (opRev) { - // equals to peg rev if not specified - args.push_back("-r"); - args.push_back(opRev); - } - - TShellCommandOptions shellOptions; - shellOptions - .SetUseShell(false) // disable shell for security reasons due to possible injections! - .SetDetachSession(false); - - TShellCommand shell("svn", args, shellOptions); - switch (shell.Run().GetStatus()) { - case TShellCommand::SHELL_INTERNAL_ERROR: - ythrow yexception() << "Export url internal error: " - << shell.GetInternalError(); - case TShellCommand::SHELL_ERROR: - ythrow TDownloadError() << "Downloading url " << convertedUrl << " failed, reason: " << shell.GetError(); - case TShellCommand::SHELL_FINISHED: - break; - default: - ythrow yexception() << "Unexpected state: " << int(shell.GetStatus()); - } - } - - auto stat = TFileStat(dstFile, true); // do not follow symlinks - if (stat.IsDir()) { - RemoveDirWithContents(dstFile); - ythrow yexception() << "Folders are not allowed"; - } - - if (stat.IsSymlink()) { - NFs::Remove(dstFile); - ythrow yexception() << "Symlinks are not allowed"; - } - - SetCacheFilePermissions(dstFile); - - const auto md5 = MD5::File(dstFile); - return std::make_pair(stat.Size, md5); - } -}; - -std::unique_ptr<IFileExporter> CreateFileExporter() { - return std::make_unique<TFileExporter>(); -} - -} - diff --git a/ydb/library/yql/core/file_storage/exporter/ya.make b/ydb/library/yql/core/file_storage/exporter/ya.make deleted file mode 100644 index c92ef2b7a5..0000000000 --- a/ydb/library/yql/core/file_storage/exporter/ya.make +++ /dev/null @@ -1,17 +0,0 @@ -LIBRARY() - -OWNER(g:yql) - -SRCS( - file_exporter.cpp -) - -PEERDIR( - arc/api/public - ydb/library/yql/core/file_storage/proto - ydb/library/yql/utils - ydb/library/yql/utils/fetch - ydb/library/yql/utils/log -) - -END() diff --git a/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp b/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp deleted file mode 100644 index 669a8071dc..0000000000 --- a/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp +++ /dev/null @@ -1,37 +0,0 @@ -#include <ydb/library/yql/core/file_storage/file_exporter.h> -#include <ydb/library/yql/core/file_storage/download_stream.h> -#include <ydb/library/yql/core/file_storage/storage.h> -#include <ydb/library/yql/utils/fetch/fetch.h> -#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> -#include <ydb/library/yql/utils/multi_resource_lock.h> - -#include <library/cpp/digest/md5/md5.h> - -#include <util/system/shellcommand.h> -#include <util/stream/file.h> -#include <util/system/fs.h> -#include <util/string/strip.h> -#include <util/folder/dirut.h> - -namespace NYql { - -class TFileExporterDummy : public IFileExporter { -public: - virtual std::pair<ui64, TString> ExportToFile(const TFileStorageConfig& Config, - const TString& convertedUrl, - const THttpURL& url, - const TString& dstFile) override - { - Y_UNUSED(Config); - Y_UNUSED(url); - - ythrow yexception() << "VCS is unsupported in current implementation; convertedUrl: " << convertedUrl << ", dstFile: " << dstFile; - } -}; - -std::unique_ptr<IFileExporter> CreateFileExporter() { - return std::make_unique<TFileExporterDummy>(); -} - -} - diff --git a/ydb/library/yql/core/file_storage/exporter_dummy/ya.make b/ydb/library/yql/core/file_storage/exporter_dummy/ya.make deleted file mode 100644 index ccfd256bff..0000000000 --- a/ydb/library/yql/core/file_storage/exporter_dummy/ya.make +++ /dev/null @@ -1,16 +0,0 @@ -LIBRARY() - -OWNER(g:yql) - -SRCS( - file_exporter.cpp -) - -PEERDIR( - ydb/library/yql/core/file_storage/proto - ydb/library/yql/utils - ydb/library/yql/utils/fetch - ydb/library/yql/utils/log -) - -END() diff --git a/ydb/library/yql/core/file_storage/file_storage.cpp b/ydb/library/yql/core/file_storage/file_storage.cpp index e247ba113e..631adbc688 100644 --- a/ydb/library/yql/core/file_storage/file_storage.cpp +++ b/ydb/library/yql/core/file_storage/file_storage.cpp @@ -1,29 +1,28 @@ #include "file_storage.h" -#include "file_exporter.h" -#include "download_stream.h" -#include "pattern_group.h" #include "storage.h" #include "url_mapper.h" #include "url_meta.h" +#include "download_stream.h" + +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> #include <ydb/library/yql/utils/fetch/fetch.h> #include <ydb/library/yql/utils/log/log.h> - +#include <ydb/library/yql/utils/log/context.h> #include <ydb/library/yql/utils/multi_resource_lock.h> #include <ydb/library/yql/utils/md5_stream.h> #include <ydb/library/yql/utils/retry.h> +#include <ydb/library/yql/utils/yql_panic.h> #include <library/cpp/cache/cache.h> #include <library/cpp/digest/md5/md5.h> -#include <library/cpp/http/misc/httpcodes.h> #include <library/cpp/threading/future/async.h> -#include <library/cpp/cgiparam/cgiparam.h> #include <util/generic/guid.h> -#include <util/folder/dirut.h> +#include <util/generic/yexception.h> + #include <util/stream/file.h> #include <util/stream/null.h> -#include <util/string/strip.h> #include <util/system/fs.h> #include <util/system/fstat.h> #include <util/system/guard.h> @@ -31,26 +30,19 @@ #include <util/system/sysstat.h> #include <util/system/utime.h> -#include <tuple> - namespace NYql { + class TFileStorageImpl: public IFileStorage { public: explicit TFileStorageImpl(const TFileStorageConfig& params) : Storage(params.GetMaxFiles(), ui64(params.GetMaxSizeMb()) << 20ull, params.GetPath()) , Config(params) , QueueStarted(0) - , SupportArcLinks(true) - , FileExporter(CreateFileExporter()) { try { for (const auto& sc : params.GetCustomSchemes()) { Mapper.AddMapping(sc.GetPattern(), sc.GetTargetUrl()); } - - if (params.GetExternalAllowedUrlPatterns().empty()) { - InitUrlPatterns(false); - } } catch (const yexception& e) { ythrow yexception() << "FileStorage: " << e.what(); } @@ -69,23 +61,8 @@ public: MtpQueue->Stop(); } - void AddAllowedUrlPattern(const TString& pattern) override { - AllowedUrls.Add(pattern); - } - - void SetExternalUser(bool isExternal) override { - try { - SupportArcLinks = !isExternal; - InitUrlPatterns(isExternal); - } catch (const yexception& e) { - ythrow yexception() << "FileStorage: " << e.what(); - } - } - - void InitUrlPatterns(bool isExternalUser) { - for (const auto& p : isExternalUser ? Config.GetExternalAllowedUrlPatterns() : Config.GetAllowedUrlPatterns()) { - AllowedUrls.Add(p); - } + void AddDownloader(IDownloaderPtr downloader) override { + Downloaders.push_back(std::move(downloader)); } TFileLinkPtr PutFile(const TString& file, const TString& outFileName = {}) override { @@ -94,25 +71,18 @@ public: const TString storageFileName = md5 + ".file"; auto lock = MultiResourceLock.Acquire(storageFileName); return Storage.Put(storageFileName, outFileName, md5, [&file, &md5](const TFsPath& dstFile) { - try { - NFs::HardLinkOrCopy(file, dstFile); - SetCacheFilePermissionsNoThrow(dstFile); - i64 length = GetFileLength(dstFile.c_str()); - if (length == -1) { - ythrow TSystemError() << "cannot get file length: " << dstFile; - } - i64 srcLength = GetFileLength(file.c_str()); - if (srcLength == -1) { - ythrow TSystemError() << "cannot get file length: " << file; - } - - YQL_ENSURE(srcLength == length); - return std::make_pair(static_cast<ui64>(length), md5); - } catch (...) { - YQL_LOG(ERROR) << CurrentExceptionMessage(); - NFs::Remove(dstFile); - throw; + NFs::HardLinkOrCopy(file, dstFile); + i64 length = GetFileLength(dstFile.c_str()); + if (length == -1) { + ythrow TSystemError() << "cannot get file length: " << dstFile; } + i64 srcLength = GetFileLength(file.c_str()); + if (srcLength == -1) { + ythrow TSystemError() << "cannot get file length: " << file; + } + + YQL_ENSURE(srcLength == length); + return std::make_pair(static_cast<ui64>(length), md5); }); } @@ -136,30 +106,23 @@ public: strippedMeta = TUrlMeta(); const TString storageFileName = md5 + ".file.stripped"; TFileLinkPtr result = Storage.Put(storageFileName, "", "", [&file](const TFsPath& dstPath) { - try { - ui64 size; - TString md5; - TShellCommand cmd("strip", {file, "-o", dstPath.GetPath()}); - cmd.Run().Wait(); - if (*cmd.GetExitCode() != 0) { - ythrow yexception() << cmd.GetError(); - } - md5 = MD5::File(dstPath.GetPath()); - size = TFile(dstPath.GetPath(), OpenExisting | RdOnly).GetLength(); - YQL_LOG(DEBUG) << "Strip " << file << " to " << dstPath.GetPath(); - return std::make_pair(size, md5); - } catch (...) { - YQL_LOG(ERROR) << CurrentExceptionMessage(); - NFs::Remove(dstPath); - throw; + ui64 size; + TString md5; + TShellCommand cmd("strip", {file, "-o", dstPath.GetPath()}); + cmd.Run().Wait(); + if (*cmd.GetExitCode() != 0) { + ythrow yexception() << cmd.GetError(); } + md5 = MD5::File(dstPath.GetPath()); + size = TFile(dstPath.GetPath(), OpenExisting | RdOnly).GetLength(); + YQL_LOG(DEBUG) << "Strip " << file << " to " << dstPath.GetPath(); + return std::make_pair(size, md5); }); strippedMeta.ContentFile = result->GetStorageFileName(); strippedMeta.Md5 = result->GetMd5(); auto metaTmpFile = Storage.GetTemp() / Storage.GetTempName(); strippedMeta.SaveTo(metaTmpFile); - SetCacheFilePermissions(metaTmpFile); Storage.MoveToStorage(metaTmpFile, strippedMetaFile); return result; } @@ -170,26 +133,19 @@ public: YQL_LOG(INFO) << "PutInline to cache. md5=" << md5; auto lock = MultiResourceLock.Acquire(storageFileName); return Storage.Put(storageFileName, TString(), md5, [&data, &md5](const TFsPath& dstFile) { - try { - TStringInput in(data); - TFile outFile(dstFile, CreateAlways | ARW | AX); - TUnbufferedFileOutput out(outFile); - auto result = std::make_pair(TransferData(&in, &out), md5); - outFile.Close(); - SetCacheFilePermissions(dstFile); - - i64 length = GetFileLength(dstFile.c_str()); - if (length == -1) { - ythrow TSystemError() << "cannot get file length: " << dstFile; - } - - YQL_ENSURE(data.size() == static_cast<ui64>(length)); - return result; - } catch (...) { - YQL_LOG(ERROR) << CurrentExceptionMessage(); - NFs::Remove(dstFile); - throw; + TStringInput in(data); + TFile outFile(dstFile, CreateAlways | ARW | AX); + TUnbufferedFileOutput out(outFile); + auto result = std::make_pair(TransferData(&in, &out), md5); + outFile.Close(); + + i64 length = GetFileLength(dstFile.c_str()); + if (length == -1) { + ythrow TSystemError() << "cannot get file length: " << dstFile; } + + YQL_ENSURE(data.size() == static_cast<ui64>(length)); + return result; }); } @@ -202,18 +158,13 @@ public: YQL_LOG(INFO) << "PutUrl to cache: " << convertedUrl; THttpURL url = ParseURL(convertedUrl); - auto rawScheme = url.GetField(NUri::TField::FieldScheme); - if (NUri::EqualNoCase(rawScheme, "arc")) { - return PutUrl(convertedUrl, url, oauthToken, true); - } else { - switch (url.GetSchemeInfo().Kind) { - case NUri::TScheme::SchemeHTTP: - case NUri::TScheme::SchemeHTTPS: - return PutUrl(convertedUrl, url, oauthToken, false); - default: - ythrow yexception() << "Unsupported url scheme: " << convertedUrl; + for (const auto& d: Downloaders) { + if (d->Accept(url)) { + return PutUrl(url, oauthToken, d); } } + + ythrow yexception() << "Unsupported url: " << convertedUrl; } catch (const std::exception& e) { YQL_LOG(ERROR) << "Failed to download file by URL \"" << urlStr << "\", details: " << e.what(); YQL_LOG_CTX_THROW yexception() << "FileStorage: Failed to download file by URL \"" << urlStr << "\", details: " << e.what(); @@ -249,6 +200,10 @@ public: return Storage.GetTemp(); } + const TFileStorageConfig& GetConfig() const override { + return Config; + } + private: void StartQueueOnce() { // we shall call Start only once @@ -257,26 +212,16 @@ private: } } - TFileLinkPtr PutUrl(const TString& convertedUrl, const THttpURL& url, const TString& oauthToken, bool isArc) { - if (isArc && !SupportArcLinks) { - YQL_LOG(WARN) << "FileStorage: Arcadia support is disabled, reject downloading " << convertedUrl; - throw yexception() << "It is not allowed to download Arcadia url " << convertedUrl; - } - - if (!isArc && !AllowedUrls.IsEmpty() && !AllowedUrls.Match(convertedUrl)) { - YQL_LOG(WARN) << "FileStorage: url " << convertedUrl << " is not in whitelist, reject downloading"; - throw yexception() << "It is not allowed to download url " << convertedUrl; - } - + TFileLinkPtr PutUrl(const THttpURL& url, const TString& oauthToken, const IDownloaderPtr& downloader) { return WithRetry<TDownloadError>(Config.GetRetryCount(), [&, this]() { - return this->DoPutUrl(convertedUrl, url, oauthToken, isArc); + return this->DoPutUrl(url, oauthToken, downloader); }, [&](const auto& e, int attempt, int attemptCount) { - YQL_LOG(WARN) << "Error while downloading url " << convertedUrl << ", attempt " << attempt << "/" << attemptCount << ", details: " << e.what(); + YQL_LOG(WARN) << "Error while downloading url " << url.PrintS() << ", attempt " << attempt << "/" << attemptCount << ", details: " << e.what(); Sleep(TDuration::MilliSeconds(Config.GetRetryDelayMs())); }); } - TFileLinkPtr DoPutUrl(const TString& convertedUrl, const THttpURL& url, const TString& oauthToken, bool isArc) { + TFileLinkPtr DoPutUrl(const THttpURL& url, const TString& oauthToken, const IDownloaderPtr& downloader) { const auto urlMetaFile = BuildUrlMetaFileName(url); auto lock = MultiResourceLock.Acquire(urlMetaFile); // let's use meta file as lock name @@ -296,61 +241,31 @@ private: << ", ContentFile=" << urlMeta.ContentFile << ", Md5=" << urlMeta.Md5 << ", LastModified=" << urlMeta.LastModified; - TFileLinkPtr result; + TStorage::TDataPuller puller; TString etag; TString lastModified; - if (isArc) { - const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified); - - result = Storage.Put(urlContentFile, TString(), TString(), [&convertedUrl, &url, this](const TFsPath& dstFile) { - try { - return this->FileExporter->ExportToFile(Config, convertedUrl, url, dstFile); - } catch (...) { - NFs::Remove(dstFile); - throw; - } - }); - - } else { - TFetchResultPtr fr1 = FetchWithETagAndLastModified(convertedUrl, oauthToken, urlMeta.ETag, urlMeta.LastModified); - switch (fr1->GetRetCode()) { - case HTTP_NOT_MODIFIED: - Y_ENSURE(oldContentLink); // should not fire - return oldContentLink; - case HTTP_OK: - break; - default: - ythrow yexception() << "Url " << convertedUrl << " cannot be accessed, code: " << fr1->GetRetCode(); - } - - std::tie(etag, lastModified) = ExtractETagAndLastModified(*fr1); - if (urlMeta.ETag && urlMeta.ETag == etag && oldContentLink) { - // for non-empty etags server should reply with code 304 Not modified - // but some servers like github.com do not support IfNoneMatch (but ETag is not empty) - // no etag is supported (previously and now) or server responded with status code 200 and we have something in cache - // access already checked by FetchWithETagAndLastModified - return oldContentLink; - } - - if (urlMeta.ETag && etag) { - YQL_LOG(INFO) << "ETag for url " << convertedUrl << " has been changed from " << urlMeta.ETag << " to " << etag << ". We have to download new version"; - } - else if (urlMeta.LastModified && lastModified) { - YQL_LOG(INFO) << "LastModified for url " << convertedUrl << " has been changed from " << urlMeta.LastModified << " to " << lastModified << ". We have to download new version"; - } - - // todo: remove oldContentLink ? - const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified); - - result = Storage.Put(urlContentFile, TString(), TString(), [&fr1, &convertedUrl](const TFsPath& dstFile) { - try { - return CopyToFile(convertedUrl, *fr1, dstFile); - } catch (...) { - NFs::Remove(dstFile); - throw; - } - }); + std::tie(puller, etag, lastModified) = downloader->Download(url, oauthToken, urlMeta.ETag, urlMeta.LastModified); + if (!puller) { + Y_ENSURE(oldContentLink); // should not fire + return oldContentLink; + } + if (urlMeta.ETag && urlMeta.ETag == etag && oldContentLink) { + // for non-empty etags server should reply with code 304 Not modified + // but some servers like github.com do not support IfNoneMatch (but ETag is not empty) + // no etag is supported (previously and now) or server responded with status code 200 and we have something in cache + // access already checked by FetchWithETagAndLastModified + return oldContentLink; } + if (urlMeta.ETag && etag) { + YQL_LOG(INFO) << "ETag for url " << url.PrintS() << " has been changed from " << urlMeta.ETag << " to " << etag << ". We have to download new version"; + } + else if (urlMeta.LastModified && lastModified) { + YQL_LOG(INFO) << "LastModified for url " << url.PrintS() << " has been changed from " << urlMeta.LastModified << " to " << lastModified << ". We have to download new version"; + } + + // todo: remove oldContentLink ? + const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified); + TFileLinkPtr result = Storage.Put(urlContentFile, TString(), TString(), puller); // save meta using rename for atomicity urlMeta.ETag = etag; @@ -359,41 +274,10 @@ private: urlMeta.Md5 = result->GetMd5(); auto metaTmpFile = Storage.GetTemp() / Storage.GetTempName(); urlMeta.SaveTo(metaTmpFile); - SetCacheFilePermissions(metaTmpFile); Storage.MoveToStorage(metaTmpFile, urlMetaFile); return result; - - } - - TFetchResultPtr FetchWithETagAndLastModified(const TString& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified) { - // more details about ETag and ModifiedSince: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26 - THttpHeaders headers; - if (!oauthToken.empty()) { - headers.AddHeader(THttpInputHeader("Authorization", "OAuth " + oauthToken)); - } - - // ETag has priority over modification time - if (!oldEtag.empty()) { - headers.AddHeader(THttpInputHeader("If-None-Match", oldEtag)); - } else if (!oldLastModified.empty()) { - headers.AddHeader(THttpInputHeader("If-Modified-Since", oldLastModified)); - } - const auto parsedUrl = ParseURL(url); - if (parsedUrl.GetHost() == "yt.yandex-team.ru" || parsedUrl.GetHost().EndsWith(".yt.yandex-team.ru")) { - auto guid = CreateGuidAsString(); - headers.AddHeader(THttpInputHeader("X-YT-Correlation-Id", guid)); - YQL_LOG(INFO) << "Use Correlation-Id=" << guid << " for " << url; - } - - const auto timeout = TDuration::MilliSeconds(Config.GetSocketTimeoutMs()); - try { - return Fetch(parsedUrl, headers, timeout); - } catch (const std::exception& e) { - // remap exception type to leverage retry logic - throw TDownloadError() << e.what(); - } } static TString BuildUrlMetaFileName(const THttpURL& url) { @@ -405,74 +289,14 @@ private: return MD5::Calc(needle + url.PrintS(THttpURL::FlagNoFrag | THttpURL::FlagHostAscii)) + ".url"; } - static std::pair<ui64, TString> CopyToFile(const TString& url, IFetchResult& src, const TString& dstFile) { - TFile outFile(dstFile, CreateAlways | ARW | AX); - TUnbufferedFileOutput out(outFile); - TMd5OutputStream md5Out(out); - - THttpInput& httpStream = src.GetStream(); - TDownloadStream input(httpStream); - const ui64 size = TransferData(&input, &md5Out); - auto result = std::make_pair(size, md5Out.Finalize()); - out.Finish(); - outFile.Close(); - SetCacheFilePermissions(dstFile); - - ui64 contentLength = 0; - // additional check for not compressed data - if (!httpStream.ContentEncoded() && httpStream.GetContentLength(contentLength) && contentLength != size) { - // let's retry this error - ythrow TDownloadError() << "Size mismatch while downloading url " << url << ", downloaded size: " << size << ", ContentLength: " << contentLength; - } - - if (auto trailers = httpStream.Trailers()) { - if (auto header = trailers->FindHeader("X-YT-Error")) { - ythrow TDownloadError() << "X-YT-Error=" << header->Value(); - } - } - - i64 dstFileLen = GetFileLength(dstFile.c_str()); - if (dstFileLen == -1) { - ythrow TSystemError() << "cannot get file length: " << dstFile; - } - - YQL_ENSURE(static_cast<ui64>(dstFileLen) == size); - return result; - } - - static TString WeakETag2Strong(const TString& etag) { - // drop W/ at the beginning if any - return etag.StartsWith("W/") ? etag.substr(2) : etag; - } - - static std::pair<TString, TString> ExtractETagAndLastModified(IFetchResult& result) { - const auto& headers = result.GetStream().Headers(); - TString etag; - TString lastModified; - // linear scan - for (auto it = headers.Begin(); it != headers.End(); ++it) { - if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("ETag")))) { - etag = WeakETag2Strong(it->Value()); - } - - if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("Last-Modified")))) { - lastModified = it->Value(); - } - } - - return std::make_pair(etag, lastModified); - } - private: TStorage Storage; const TFileStorageConfig Config; + std::vector<IDownloaderPtr> Downloaders; TUrlMapper Mapper; - TPatternGroup AllowedUrls; TAtomic QueueStarted; - bool SupportArcLinks; THolder<IThreadPool> MtpQueue; TMultiResourceLock MultiResourceLock; - std::unique_ptr<IFileExporter> FileExporter; }; TFileStoragePtr CreateFileStorage(const TFileStorageConfig& params) { diff --git a/ydb/library/yql/core/file_storage/file_storage.h b/ydb/library/yql/core/file_storage/file_storage.h index 95f7203ee1..9133d57b1b 100644 --- a/ydb/library/yql/core/file_storage/file_storage.h +++ b/ydb/library/yql/core/file_storage/file_storage.h @@ -3,21 +3,33 @@ #include "storage.h" #include <library/cpp/threading/future/future.h> +#include <library/cpp/uri/http_url.h> #include <util/folder/path.h> #include <util/generic/ptr.h> #include <util/generic/string.h> +#include <functional> +#include <unordered_map> +#include <tuple> + namespace NYql { +class TFileStorageConfig; + struct IFileStorage: public TThrRefBase { + struct IDownloader : public TThrRefBase { + virtual bool Accept(const THttpURL& url) = 0; + virtual std::tuple<TStorage::TDataPuller, TString, TString> Download(const THttpURL& url, const TString& oauthToken, const TString& etag, const TString& lastModified) = 0; + }; + using IDownloaderPtr = TIntrusivePtr<IDownloader>; + virtual ~IFileStorage() = default; - virtual void AddAllowedUrlPattern(const TString& pattern) = 0; + virtual void AddDownloader(IDownloaderPtr downloader) = 0; virtual TFileLinkPtr PutFile(const TString& file, const TString& outFileName = {}) = 0; virtual TFileLinkPtr PutFileStripped(const TString& file, const TString& originalMd5 = {}) = 0; virtual TFileLinkPtr PutInline(const TString& data) = 0; virtual TFileLinkPtr PutUrl(const TString& url, const TString& oauthToken) = 0; - virtual void SetExternalUser(bool isExternal) = 0; // async versions virtual NThreading::TFuture<TFileLinkPtr> PutFileAsync(const TString& file, const TString& outFileName = {}) = 0; virtual NThreading::TFuture<TFileLinkPtr> PutInlineAsync(const TString& data) = 0; @@ -25,12 +37,11 @@ struct IFileStorage: public TThrRefBase { virtual TFsPath GetRoot() const = 0; virtual TFsPath GetTemp() const = 0; + virtual const TFileStorageConfig& GetConfig() const = 0; }; using TFileStoragePtr = TIntrusivePtr<IFileStorage>; -class TFileStorageConfig; - // Will use auto-cleaned temporary directory if storagePath is empty TFileStoragePtr CreateFileStorage(const TFileStorageConfig& params); diff --git a/ydb/library/yql/core/file_storage/file_storage_ut.cpp b/ydb/library/yql/core/file_storage/file_storage_ut.cpp index 1df31093e1..5103fdd31d 100644 --- a/ydb/library/yql/core/file_storage/file_storage_ut.cpp +++ b/ydb/library/yql/core/file_storage/file_storage_ut.cpp @@ -1,7 +1,8 @@ #include "file_storage.h" -#include <ydb/library/yql/core/file_storage/ut/test_http_server.h> +#include <ydb/library/yql/core/file_storage/ut/test_http_server.h> #include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/core/file_storage/http_download/http_download.h> #include <library/cpp/threading/future/future.h> #include <library/cpp/threading/future/async.h> @@ -21,6 +22,12 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TIFStream(path).ReadAll(); } + static TFileStoragePtr CreateTestFS(const TFileStorageConfig& params = {}, const std::vector<TString>& extraPatterns = {}) { + TFileStoragePtr fs = CreateFileStorage(params); + fs->AddDownloader(MakeHttpDownloader(false, params, extraPatterns)); + return fs; + } + static std::unique_ptr<TTestHttpServer> CreateTestHttpServer() { TPortManager pm; const ui16 port = pm.GetPort(); @@ -57,8 +64,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::Ok(currentContent); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); auto link1 = fs->PutUrl(url, {}); @@ -91,8 +97,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::Ok(content); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -121,8 +126,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, currentETag); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -165,8 +169,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkLastModified(currentContent, currentLastModified); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -206,8 +209,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, currentETag); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -250,8 +252,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, MakeWeakETag(currentETag)); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -287,8 +288,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::Ok(currentContent); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); auto link1 = fs->PutUrl(url, {}); @@ -318,8 +318,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, currentETag); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); auto link1 = fs->PutUrl(url, {}); @@ -349,8 +348,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, currentETag); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); auto link1 = fs->PutUrl(url, {}); @@ -371,8 +369,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { Y_UNIT_TEST(Md5ForPutFiles) { TString currentContent = "ABC"; - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); TTempFileHandle h1; h1.Write("ABC", 3); @@ -412,7 +409,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { TFileStorageConfig params; params.SetRetryCount(3); - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(params); auto url = server->GetUrl(); @@ -432,8 +429,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { return TTestHttpServer::TReply::OkETag(currentContent, currentETag, 0); }); - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(); auto url = server->GetUrl(); @@ -462,7 +458,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { TFileStorageConfig params; params.SetRetryCount(3); - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(params); auto url = server->GetUrl(); @@ -480,26 +476,36 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { auto url = server->GetUrl(); - // not in whitelist - TFileStorageConfig params1; - params1.AddAllowedUrlPatterns("^XXXX$"); - TFileStoragePtr fs1 = CreateFileStorage(params1); + { + // not in whitelist + TFileStorageConfig params; + params.AddAllowedUrlPatterns("^XXXX$"); + TFileStoragePtr fs = CreateTestFS(params); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs1->PutUrl(url, {}), std::exception, "It is not allowed to download url http://localhost:"); - fs1->AddAllowedUrlPattern("^http://localhost:"); + UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl(url, {}), std::exception, "It is not allowed to download url http://localhost:"); + } - auto link1 = fs1->PutUrl(url, {}); - UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link1->GetPath())); + { + // have in whitelist + TFileStorageConfig params; + params.SetSocketTimeoutMs(4000); + params.AddAllowedUrlPatterns("^http://localhost:"); + TFileStoragePtr fs = CreateTestFS(params); - // have in whitelist - TFileStorageConfig params2; - params2.SetSocketTimeoutMs(4000); - params2.AddAllowedUrlPatterns("^http://localhost:"); - TFileStoragePtr fs2 = CreateFileStorage(params2); - fs2->AddAllowedUrlPattern("^XXXX$"); + auto link = fs->PutUrl(url, {}); + UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link->GetPath())); + } + + { + // have eaxtra url in whitelist + TFileStorageConfig params; + params.SetSocketTimeoutMs(4000); + params.AddAllowedUrlPatterns("^XXXX$"); + TFileStoragePtr fs = CreateTestFS(params, std::vector{TString{"^http://localhost:"}}); - auto link2 = fs1->PutUrl(url, {}); - UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link2->GetPath())); + auto link = fs->PutUrl(url, {}); + UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link->GetPath())); + } } Y_UNIT_TEST(SocketTimeout) { @@ -512,23 +518,9 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { TFileStorageConfig params; params.SetSocketTimeoutMs(1000); - TFileStoragePtr fs = CreateFileStorage(params); + TFileStoragePtr fs = CreateTestFS(params); auto url = server->GetUrl(); UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl(url, {}), std::exception, "can not read from socket input stream"); } - -#ifndef OPENSOURCE - Y_UNIT_TEST(ArcFileWithoutRev) { - TFileStorageConfig params; - TFileStoragePtr fs = CreateFileStorage(params); - - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt", {}), std::exception, "Revision for Arcadia file must be specified"); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=", {}), std::exception, "Revision for Arcadia file must be specified"); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=0", {}), std::exception, "Revision for Arcadia file must be specified"); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=head", {}), std::exception, "Revision for Arcadia file must be specified"); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=HEAD", {}), std::exception, "Revision for Arcadia file must be specified"); - UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?op_rev=123", {}), std::exception, "Revision for Arcadia file must be specified"); - } -#endif } diff --git a/ydb/library/yql/core/file_storage/http_download/http_download.cpp b/ydb/library/yql/core/file_storage/http_download/http_download.cpp new file mode 100644 index 0000000000..0011d3b92a --- /dev/null +++ b/ydb/library/yql/core/file_storage/http_download/http_download.cpp @@ -0,0 +1,172 @@ +#include "http_download.h" +#include "pattern_group.h" + +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/core/file_storage/download_stream.h> + +#include <ydb/library/yql/utils/fetch/fetch.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/context.h> +#include <ydb/library/yql/utils/multi_resource_lock.h> +#include <ydb/library/yql/utils/md5_stream.h> +#include <ydb/library/yql/utils/retry.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <library/cpp/digest/md5/md5.h> +#include <library/cpp/http/misc/httpcodes.h> + +#include <util/generic/guid.h> +#include <util/generic/yexception.h> +#include <util/stream/file.h> +#include <util/system/file.h> + + +namespace NYql { + +class THttpDownloader: public IFileStorage::IDownloader { +public: + THttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls) + : SocketTimeoutMs(config.GetSocketTimeoutMs()) + { + for (const auto& p : restictedUser ? config.GetExternalAllowedUrlPatterns() : config.GetAllowedUrlPatterns()) { + AllowedUrls.Add(p); + } + for (auto p: extraAllowedUrls) { + AllowedUrls.Add(p); + } + } + ~THttpDownloader() = default; + + bool Accept(const THttpURL& url) final { + switch (url.GetScheme()) { + case NUri::TScheme::SchemeHTTP: + case NUri::TScheme::SchemeHTTPS: + return true; + default: + break; + } + return false; + } + + std::tuple<TStorage::TDataPuller, TString, TString> Download(const THttpURL& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified) final { + if (!AllowedUrls.IsEmpty()) { + if (auto strUrl = url.PrintS(); !AllowedUrls.Match(strUrl)) { + YQL_LOG(WARN) << "FileStorage: url " << strUrl << " is not in whitelist, reject downloading"; + throw yexception() << "It is not allowed to download url " << strUrl; + } + } + + TFetchResultPtr fr1 = FetchWithETagAndLastModified(url, oauthToken, oldEtag, oldLastModified, SocketTimeoutMs); + switch (fr1->GetRetCode()) { + case HTTP_NOT_MODIFIED: + return std::make_tuple(TStorage::TDataPuller{}, TString{}, TString{}); + case HTTP_OK: + break; + default: + ythrow yexception() << "Url " << url.PrintS() << " cannot be accessed, code: " << fr1->GetRetCode(); + } + + auto pair = ExtractETagAndLastModified(*fr1); + + auto puller = [urlStr = url.PrintS(), fr1](const TFsPath& dstPath) -> std::pair<ui64, TString> { + return CopyToFile(urlStr, *fr1, dstPath); + }; + + return std::make_tuple(puller, pair.first, pair.second); + } + +private: + static TFetchResultPtr FetchWithETagAndLastModified(const THttpURL& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified, ui32 socketTimeoutMs) { + // more details about ETag and ModifiedSince: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26 + THttpHeaders headers; + if (!oauthToken.empty()) { + headers.AddHeader(THttpInputHeader("Authorization", "OAuth " + oauthToken)); + } + + // ETag has priority over modification time + if (!oldEtag.empty()) { + headers.AddHeader(THttpInputHeader("If-None-Match", oldEtag)); + } else if (!oldLastModified.empty()) { + headers.AddHeader(THttpInputHeader("If-Modified-Since", oldLastModified)); + } + if (url.GetHost() == "yt.yandex-team.ru" || url.GetHost().EndsWith(".yt.yandex-team.ru")) { + auto guid = CreateGuidAsString(); + headers.AddHeader(THttpInputHeader("X-YT-Correlation-Id", guid)); + YQL_LOG(INFO) << "Use Correlation-Id=" << guid << " for " << url.PrintS(); + } + + try { + return Fetch(url, headers, TDuration::MilliSeconds(socketTimeoutMs)); + } catch (const std::exception& e) { + // remap exception type to leverage retry logic + throw TDownloadError() << e.what(); + } + } + + static std::pair<ui64, TString> CopyToFile(const TString& url, IFetchResult& src, const TString& dstFile) { + TFile outFile(dstFile, CreateAlways | ARW | AX); + TUnbufferedFileOutput out(outFile); + TMd5OutputStream md5Out(out); + + THttpInput& httpStream = src.GetStream(); + TDownloadStream input(httpStream); + const ui64 size = TransferData(&input, &md5Out); + auto result = std::make_pair(size, md5Out.Finalize()); + out.Finish(); + outFile.Close(); + + ui64 contentLength = 0; + // additional check for not compressed data + if (!httpStream.ContentEncoded() && httpStream.GetContentLength(contentLength) && contentLength != size) { + // let's retry this error + ythrow TDownloadError() << "Size mismatch while downloading url " << url << ", downloaded size: " << size << ", ContentLength: " << contentLength; + } + + if (auto trailers = httpStream.Trailers()) { + if (auto header = trailers->FindHeader("X-YT-Error")) { + ythrow TDownloadError() << "X-YT-Error=" << header->Value(); + } + } + + i64 dstFileLen = GetFileLength(dstFile.c_str()); + if (dstFileLen == -1) { + ythrow TSystemError() << "cannot get file length: " << dstFile; + } + + YQL_ENSURE(static_cast<ui64>(dstFileLen) == size); + return result; + } + + static TString WeakETag2Strong(const TString& etag) { + // drop W/ at the beginning if any + return etag.StartsWith("W/") ? etag.substr(2) : etag; + } + + static std::pair<TString, TString> ExtractETagAndLastModified(IFetchResult& result) { + const auto& headers = result.GetStream().Headers(); + TString etag; + TString lastModified; + // linear scan + for (auto it = headers.Begin(); it != headers.End(); ++it) { + if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("ETag")))) { + etag = WeakETag2Strong(it->Value()); + } + + if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("Last-Modified")))) { + lastModified = it->Value(); + } + } + + return std::make_pair(etag, lastModified); + } + +private: + ui32 SocketTimeoutMs; + TPatternGroup AllowedUrls; +}; + +IFileStorage::IDownloaderPtr MakeHttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls) { + return MakeIntrusive<THttpDownloader>(restictedUser, config, extraAllowedUrls); +} + +} // NYql diff --git a/ydb/library/yql/core/file_storage/http_download/http_download.h b/ydb/library/yql/core/file_storage/http_download/http_download.h new file mode 100644 index 0000000000..a0abc1c111 --- /dev/null +++ b/ydb/library/yql/core/file_storage/http_download/http_download.h @@ -0,0 +1,13 @@ +#pragma once + +#include <ydb/library/yql/core/file_storage/file_storage.h> + +#include <vector> + +namespace NYql { + +class TFileStorageConfig; + +IFileStorage::IDownloaderPtr MakeHttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls); + +} // NYql diff --git a/ydb/library/yql/core/file_storage/pattern_group.cpp b/ydb/library/yql/core/file_storage/http_download/pattern_group.cpp index 689c97d322..f60ec107be 100644 --- a/ydb/library/yql/core/file_storage/pattern_group.cpp +++ b/ydb/library/yql/core/file_storage/http_download/pattern_group.cpp @@ -1,4 +1,4 @@ -#include "pattern_group.h" +#include <ydb/library/yql/core/file_storage/http_download/pattern_group.h> namespace NYql { diff --git a/ydb/library/yql/core/file_storage/pattern_group.h b/ydb/library/yql/core/file_storage/http_download/pattern_group.h index 53d0d64a7f..53d0d64a7f 100644 --- a/ydb/library/yql/core/file_storage/pattern_group.h +++ b/ydb/library/yql/core/file_storage/http_download/pattern_group.h diff --git a/ydb/library/yql/core/file_storage/pattern_group_ut.cpp b/ydb/library/yql/core/file_storage/http_download/pattern_group_ut.cpp index 02ee126745..1b917252c1 100644 --- a/ydb/library/yql/core/file_storage/pattern_group_ut.cpp +++ b/ydb/library/yql/core/file_storage/http_download/pattern_group_ut.cpp @@ -1,4 +1,5 @@ #include "pattern_group.h" + #include <library/cpp/testing/unittest/registar.h> using namespace NYql; diff --git a/ydb/library/yql/core/file_storage/http_download/ut/ya.make b/ydb/library/yql/core/file_storage/http_download/ut/ya.make new file mode 100644 index 0000000000..fec8e3e8c5 --- /dev/null +++ b/ydb/library/yql/core/file_storage/http_download/ut/ya.make @@ -0,0 +1,9 @@ +UNITTEST_FOR(ydb/library/yql/core/file_storage/http_download) + +OWNER(g:yql) + +SRCS( + pattern_group_ut.cpp +) + +END() diff --git a/ydb/library/yql/core/file_storage/http_download/ya.make b/ydb/library/yql/core/file_storage/http_download/ya.make new file mode 100644 index 0000000000..344836558d --- /dev/null +++ b/ydb/library/yql/core/file_storage/http_download/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +OWNER(g:yql) + +SRCS( + http_download.cpp + pattern_group.cpp +) + +PEERDIR( + ydb/library/yql/core/file_storage + ydb/library/yql/core/file_storage/proto + ydb/library/yql/utils/fetch + ydb/library/yql/utils/log + ydb/library/yql/utils + library/cpp/regex/pcre + library/cpp/digest/md5 + library/cpp/http/misc +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/core/file_storage/storage.cpp b/ydb/library/yql/core/file_storage/storage.cpp index cae4682904..247bff5242 100644 --- a/ydb/library/yql/core/file_storage/storage.cpp +++ b/ydb/library/yql/core/file_storage/storage.cpp @@ -208,10 +208,18 @@ public: ui64 fileSize = 0; TString pullerMd5; // overrides input arg 'md5' - std::tie(fileSize, pullerMd5) = puller(hardlinkFile); + try { + std::tie(fileSize, pullerMd5) = puller(hardlinkFile); + } catch (...) { + YQL_LOG(ERROR) << CurrentExceptionMessage(); + NFs::Remove(hardlinkFile); + throw; + } Y_ENSURE(hardlinkFile.Exists(), "FileStorage: cannot put not existing temporary path"); Y_ENSURE(hardlinkFile.IsFile(), "FileStorage: cannot put non-file temporary path"); + SetCacheFilePermissionsNoThrow(hardlinkFile); + if (NFs::HardLink(hardlinkFile, storageFile)) { AtomicIncrement(CurrentFiles); AtomicAdd(CurrentSize, fileSize); @@ -223,8 +231,6 @@ public: result = MakeIntrusive<TFileLink>(hardlinkFile, storageFileName, fileSize, pullerMd5); } - SetCacheFilePermissionsNoThrow(result->GetPath().c_str()); - YQL_LOG(INFO) << "Using " << (newFileAdded ? "new" : "existing") << " storage file " << result->GetStorageFileName().Quote() << ", temp path: " << result->GetPath().GetPath().Quote() << ", size: " << result->GetSize(); @@ -269,6 +275,7 @@ public: if (!NFs::Rename(src, dstStorageFile)) { ythrow yexception() << "Failed to rename file from " << src << " to " << dstStorageFile; } + SetCacheFilePermissionsNoThrow(dstStorageFile); const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str())); diff --git a/ydb/library/yql/core/file_storage/ut/ya.make b/ydb/library/yql/core/file_storage/ut/ya.make index 7c67ef85b9..c19f58203b 100644 --- a/ydb/library/yql/core/file_storage/ut/ya.make +++ b/ydb/library/yql/core/file_storage/ut/ya.make @@ -11,7 +11,6 @@ ENDIF() SRCS( file_storage_ut.cpp - pattern_group_ut.cpp sized_cache_ut.cpp storage_ut.cpp test_http_server.cpp @@ -21,6 +20,7 @@ SRCS( PEERDIR( library/cpp/http/server library/cpp/threading/future + ydb/library/yql/core/file_storage/http_download ) END() diff --git a/ydb/library/yql/core/file_storage/ya.make b/ydb/library/yql/core/file_storage/ya.make index c1cd0f7a4c..f25592bfb7 100644 --- a/ydb/library/yql/core/file_storage/ya.make +++ b/ydb/library/yql/core/file_storage/ya.make @@ -7,8 +7,6 @@ SRCS( download_stream.h file_storage.cpp file_storage.h - pattern_group.cpp - pattern_group.h sized_cache.cpp sized_cache.h storage.cpp @@ -20,29 +18,18 @@ SRCS( ) PEERDIR( - contrib/libs/grpc library/cpp/cache - library/cpp/cgiparam library/cpp/digest/md5 library/cpp/logger/global - library/cpp/regex/pcre library/cpp/threading/future + library/cpp/regex/pcre + library/cpp/uri ydb/library/yql/core/file_storage/proto ydb/library/yql/utils - ydb/library/yql/utils/fetch ydb/library/yql/utils/log + ydb/library/yql/utils/fetch ) -IF (NOT OPENSOURCE) -PEERDIR( - ydb/library/yql/core/file_storage/exporter -) -ELSE() -PEERDIR( - ydb/library/yql/core/file_storage/exporter_dummy -) -ENDIF() - END() RECURSE_FOR_TESTS( |