aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-02-09 21:23:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit0830448769cbfaf4d68b901433d786d0be99fe00 (patch)
tree40920c6fcae0c265b916a86ef84e66b2f5c5b503
parentb78c7143329217632f2223ff8b3d36d0cdeadd41 (diff)
downloadydb-0830448769cbfaf4d68b901433d786d0be99fe00.tar.gz
[yql] Extract downloaders from file storage
YQL-14297 ref:ae5dae36c3bad823f361c50432d5d7789f5b9352
-rw-r--r--ydb/library/yql/core/file_storage/exporter/file_exporter.cpp179
-rw-r--r--ydb/library/yql/core/file_storage/exporter/ya.make17
-rw-r--r--ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp37
-rw-r--r--ydb/library/yql/core/file_storage/exporter_dummy/ya.make16
-rw-r--r--ydb/library/yql/core/file_storage/file_storage.cpp334
-rw-r--r--ydb/library/yql/core/file_storage/file_storage.h19
-rw-r--r--ydb/library/yql/core/file_storage/file_storage_ut.cpp104
-rw-r--r--ydb/library/yql/core/file_storage/http_download/http_download.cpp172
-rw-r--r--ydb/library/yql/core/file_storage/http_download/http_download.h13
-rw-r--r--ydb/library/yql/core/file_storage/http_download/pattern_group.cpp (renamed from ydb/library/yql/core/file_storage/pattern_group.cpp)2
-rw-r--r--ydb/library/yql/core/file_storage/http_download/pattern_group.h (renamed from ydb/library/yql/core/file_storage/pattern_group.h)0
-rw-r--r--ydb/library/yql/core/file_storage/http_download/pattern_group_ut.cpp (renamed from ydb/library/yql/core/file_storage/pattern_group_ut.cpp)1
-rw-r--r--ydb/library/yql/core/file_storage/http_download/ut/ya.make9
-rw-r--r--ydb/library/yql/core/file_storage/http_download/ya.make25
-rw-r--r--ydb/library/yql/core/file_storage/storage.cpp13
-rw-r--r--ydb/library/yql/core/file_storage/ut/ya.make2
-rw-r--r--ydb/library/yql/core/file_storage/ya.make19
17 files changed, 377 insertions, 585 deletions
diff --git a/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp b/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp
deleted file mode 100644
index 913d6dbd52..0000000000
--- a/ydb/library/yql/core/file_storage/exporter/file_exporter.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-#include <ydb/library/yql/core/file_storage/file_exporter.h>
-#include <ydb/library/yql/core/file_storage/download_stream.h>
-#include <ydb/library/yql/core/file_storage/storage.h>
-#include <ydb/library/yql/utils/fetch/fetch.h>
-#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
-#include <ydb/library/yql/utils/multi_resource_lock.h>
-
-#include <library/cpp/digest/md5/md5.h>
-#include <library/cpp/cgiparam/cgiparam.h>
-
-#include <grpc++/grpc++.h>
-
-#include <arc/api/public/repo.grpc.pb.h>
-
-#include <util/system/shellcommand.h>
-#include <util/stream/file.h>
-#include <util/system/fs.h>
-#include <util/string/strip.h>
-#include <util/folder/dirut.h>
-
-namespace NYql {
-
-class TClient {
-public:
- TClient(
- std::shared_ptr<grpc::Channel> channel)
- : Stub(NVcs::NPublic::FileService::NewStub(channel))
- {
- }
-
- TString ReadFile(const NVcs::NPublic::ReadFileRequest& request, const TString& path) {
- grpc::ClientContext context;
- auto reader = Stub->ReadFile(&context, request);
- NVcs::NPublic::ReadFileResponse response;
- TString ret;
- for (;;) {
- if (!reader->Read(&response)) {
- grpc::Status status = reader->Finish();
- if (!status.ok()) {
- ythrow yexception() << "Failed to download url: " << path << ", status: " << status.error_message();
- }
-
- break;
- }
-
- if (response.HasData()) {
- ret += response.GetData();
- }
- }
-
- return ret;
- }
-private:
- std::unique_ptr<NVcs::NPublic::FileService::Stub> Stub;
-};
-
-class TFileExporter : public IFileExporter {
-public:
- std::pair<ui64, TString> ExportToFile(const TFileStorageConfig& config, const TString& convertedUrl, const THttpURL& url, const TString& dstFile) override {
- Y_UNUSED(convertedUrl);
- TCgiParameters params(url.GetField(NUri::TField::FieldQuery));
- auto hash = params.Get("hash");
- if (hash) {
- if (hash.length() != 40) {
- throw yexception() << "Only commit hash is expected, but got: " << hash;
- }
-
- if (!config.HasArcTokenPath()) {
- throw yexception() << "Missing Arc token";
- }
-
- auto path = config.GetArcTokenPath();
- if (path.StartsWith("~")) {
- path = GetHomeDir() + path.substr(1, path.Size() - 1);
- }
-
- TString authToken = Strip(TFileInput(path).ReadAll());
-
- std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::CompositeChannelCredentials(
- grpc::SslCredentials(grpc::SslCredentialsOptions()),
- grpc::AccessTokenCredentials(authToken));
- TClient client(grpc::CreateChannel("api.arc-vcs.yandex-team.ru:6734", credentials));
-
- TString remoteFilePath = TStringBuilder() << url.GetField(NUri::TField::FieldHost) << url.GetField(NUri::TField::FieldPath);
- NVcs::NPublic::ReadFileRequest request;
- request.SetPath(remoteFilePath);
- request.SetRevision(hash);
- const auto content = client.ReadFile(request, remoteFilePath);
- TFileOutput dst(dstFile);
- dst.Write(content.Data(), content.Size());
- dst.Finish();
-
- i64 dstFileLen = GetFileLength(dstFile.c_str());
- if (dstFileLen == -1) {
- ythrow TSystemError() << "cannot get file length: " << dstFile;
- }
-
- YQL_ENSURE(static_cast<ui64>(dstFileLen) == content.Size());
- }
- else {
- auto branch = params.Get("branch");
- // support both operative and peg revision (see http://svnbook.red-bean.com/en/1.6/svn.advanced.pegrevs.html )
- // peg rev is enough usually
- auto pegRevStr = params.Get("rev");
- unsigned int pegRev = 0;
- if (!TryFromString(pegRevStr, pegRev) || pegRev == 0) {
- throw yexception() << "Revision for Arcadia file must be specified";
- }
-
- auto opRev = params.Get("op_rev");
- if (!branch) {
- branch = "trunk";
- }
-
- // in order to support @ in file names we have to add @ and optional peg revision
- // see https://stackoverflow.com/questions/757435/how-to-escape-characters-in-subversion-managed-file-names
- TStringBuilder fullUrlBuilder = TStringBuilder() << "svn+ssh://arcadia-ro.yandex.ru/arc/" << branch << "/arcadia/" <<
- url.GetField(NUri::TField::FieldHost) << url.GetField(NUri::TField::FieldPath) << "@" << pegRev;
-
- TStringBuilder configOptionBuilder = TStringBuilder() << "config:tunnels:ssh=ssh -F /dev/null -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=30 -o ForwardAgent=true";
-
- if (config.GetArcSshPkPath()) {
- configOptionBuilder << " -i " << config.GetArcSshPkPath();
- }
-
- if (config.GetArcSshUser()) {
- configOptionBuilder << " -l " << config.GetArcSshUser();
- }
-
- TList<TString> args = { "export", fullUrlBuilder, dstFile, "--non-interactive", "--depth=empty", "--config-option", configOptionBuilder };
- if (opRev) {
- // equals to peg rev if not specified
- args.push_back("-r");
- args.push_back(opRev);
- }
-
- TShellCommandOptions shellOptions;
- shellOptions
- .SetUseShell(false) // disable shell for security reasons due to possible injections!
- .SetDetachSession(false);
-
- TShellCommand shell("svn", args, shellOptions);
- switch (shell.Run().GetStatus()) {
- case TShellCommand::SHELL_INTERNAL_ERROR:
- ythrow yexception() << "Export url internal error: "
- << shell.GetInternalError();
- case TShellCommand::SHELL_ERROR:
- ythrow TDownloadError() << "Downloading url " << convertedUrl << " failed, reason: " << shell.GetError();
- case TShellCommand::SHELL_FINISHED:
- break;
- default:
- ythrow yexception() << "Unexpected state: " << int(shell.GetStatus());
- }
- }
-
- auto stat = TFileStat(dstFile, true); // do not follow symlinks
- if (stat.IsDir()) {
- RemoveDirWithContents(dstFile);
- ythrow yexception() << "Folders are not allowed";
- }
-
- if (stat.IsSymlink()) {
- NFs::Remove(dstFile);
- ythrow yexception() << "Symlinks are not allowed";
- }
-
- SetCacheFilePermissions(dstFile);
-
- const auto md5 = MD5::File(dstFile);
- return std::make_pair(stat.Size, md5);
- }
-};
-
-std::unique_ptr<IFileExporter> CreateFileExporter() {
- return std::make_unique<TFileExporter>();
-}
-
-}
-
diff --git a/ydb/library/yql/core/file_storage/exporter/ya.make b/ydb/library/yql/core/file_storage/exporter/ya.make
deleted file mode 100644
index c92ef2b7a5..0000000000
--- a/ydb/library/yql/core/file_storage/exporter/ya.make
+++ /dev/null
@@ -1,17 +0,0 @@
-LIBRARY()
-
-OWNER(g:yql)
-
-SRCS(
- file_exporter.cpp
-)
-
-PEERDIR(
- arc/api/public
- ydb/library/yql/core/file_storage/proto
- ydb/library/yql/utils
- ydb/library/yql/utils/fetch
- ydb/library/yql/utils/log
-)
-
-END()
diff --git a/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp b/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp
deleted file mode 100644
index 669a8071dc..0000000000
--- a/ydb/library/yql/core/file_storage/exporter_dummy/file_exporter.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-#include <ydb/library/yql/core/file_storage/file_exporter.h>
-#include <ydb/library/yql/core/file_storage/download_stream.h>
-#include <ydb/library/yql/core/file_storage/storage.h>
-#include <ydb/library/yql/utils/fetch/fetch.h>
-#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
-#include <ydb/library/yql/utils/multi_resource_lock.h>
-
-#include <library/cpp/digest/md5/md5.h>
-
-#include <util/system/shellcommand.h>
-#include <util/stream/file.h>
-#include <util/system/fs.h>
-#include <util/string/strip.h>
-#include <util/folder/dirut.h>
-
-namespace NYql {
-
-class TFileExporterDummy : public IFileExporter {
-public:
- virtual std::pair<ui64, TString> ExportToFile(const TFileStorageConfig& Config,
- const TString& convertedUrl,
- const THttpURL& url,
- const TString& dstFile) override
- {
- Y_UNUSED(Config);
- Y_UNUSED(url);
-
- ythrow yexception() << "VCS is unsupported in current implementation; convertedUrl: " << convertedUrl << ", dstFile: " << dstFile;
- }
-};
-
-std::unique_ptr<IFileExporter> CreateFileExporter() {
- return std::make_unique<TFileExporterDummy>();
-}
-
-}
-
diff --git a/ydb/library/yql/core/file_storage/exporter_dummy/ya.make b/ydb/library/yql/core/file_storage/exporter_dummy/ya.make
deleted file mode 100644
index ccfd256bff..0000000000
--- a/ydb/library/yql/core/file_storage/exporter_dummy/ya.make
+++ /dev/null
@@ -1,16 +0,0 @@
-LIBRARY()
-
-OWNER(g:yql)
-
-SRCS(
- file_exporter.cpp
-)
-
-PEERDIR(
- ydb/library/yql/core/file_storage/proto
- ydb/library/yql/utils
- ydb/library/yql/utils/fetch
- ydb/library/yql/utils/log
-)
-
-END()
diff --git a/ydb/library/yql/core/file_storage/file_storage.cpp b/ydb/library/yql/core/file_storage/file_storage.cpp
index e247ba113e..631adbc688 100644
--- a/ydb/library/yql/core/file_storage/file_storage.cpp
+++ b/ydb/library/yql/core/file_storage/file_storage.cpp
@@ -1,29 +1,28 @@
#include "file_storage.h"
-#include "file_exporter.h"
-#include "download_stream.h"
-#include "pattern_group.h"
#include "storage.h"
#include "url_mapper.h"
#include "url_meta.h"
+#include "download_stream.h"
+
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
#include <ydb/library/yql/utils/fetch/fetch.h>
#include <ydb/library/yql/utils/log/log.h>
-
+#include <ydb/library/yql/utils/log/context.h>
#include <ydb/library/yql/utils/multi_resource_lock.h>
#include <ydb/library/yql/utils/md5_stream.h>
#include <ydb/library/yql/utils/retry.h>
+#include <ydb/library/yql/utils/yql_panic.h>
#include <library/cpp/cache/cache.h>
#include <library/cpp/digest/md5/md5.h>
-#include <library/cpp/http/misc/httpcodes.h>
#include <library/cpp/threading/future/async.h>
-#include <library/cpp/cgiparam/cgiparam.h>
#include <util/generic/guid.h>
-#include <util/folder/dirut.h>
+#include <util/generic/yexception.h>
+
#include <util/stream/file.h>
#include <util/stream/null.h>
-#include <util/string/strip.h>
#include <util/system/fs.h>
#include <util/system/fstat.h>
#include <util/system/guard.h>
@@ -31,26 +30,19 @@
#include <util/system/sysstat.h>
#include <util/system/utime.h>
-#include <tuple>
-
namespace NYql {
+
class TFileStorageImpl: public IFileStorage {
public:
explicit TFileStorageImpl(const TFileStorageConfig& params)
: Storage(params.GetMaxFiles(), ui64(params.GetMaxSizeMb()) << 20ull, params.GetPath())
, Config(params)
, QueueStarted(0)
- , SupportArcLinks(true)
- , FileExporter(CreateFileExporter())
{
try {
for (const auto& sc : params.GetCustomSchemes()) {
Mapper.AddMapping(sc.GetPattern(), sc.GetTargetUrl());
}
-
- if (params.GetExternalAllowedUrlPatterns().empty()) {
- InitUrlPatterns(false);
- }
} catch (const yexception& e) {
ythrow yexception() << "FileStorage: " << e.what();
}
@@ -69,23 +61,8 @@ public:
MtpQueue->Stop();
}
- void AddAllowedUrlPattern(const TString& pattern) override {
- AllowedUrls.Add(pattern);
- }
-
- void SetExternalUser(bool isExternal) override {
- try {
- SupportArcLinks = !isExternal;
- InitUrlPatterns(isExternal);
- } catch (const yexception& e) {
- ythrow yexception() << "FileStorage: " << e.what();
- }
- }
-
- void InitUrlPatterns(bool isExternalUser) {
- for (const auto& p : isExternalUser ? Config.GetExternalAllowedUrlPatterns() : Config.GetAllowedUrlPatterns()) {
- AllowedUrls.Add(p);
- }
+ void AddDownloader(IDownloaderPtr downloader) override {
+ Downloaders.push_back(std::move(downloader));
}
TFileLinkPtr PutFile(const TString& file, const TString& outFileName = {}) override {
@@ -94,25 +71,18 @@ public:
const TString storageFileName = md5 + ".file";
auto lock = MultiResourceLock.Acquire(storageFileName);
return Storage.Put(storageFileName, outFileName, md5, [&file, &md5](const TFsPath& dstFile) {
- try {
- NFs::HardLinkOrCopy(file, dstFile);
- SetCacheFilePermissionsNoThrow(dstFile);
- i64 length = GetFileLength(dstFile.c_str());
- if (length == -1) {
- ythrow TSystemError() << "cannot get file length: " << dstFile;
- }
- i64 srcLength = GetFileLength(file.c_str());
- if (srcLength == -1) {
- ythrow TSystemError() << "cannot get file length: " << file;
- }
-
- YQL_ENSURE(srcLength == length);
- return std::make_pair(static_cast<ui64>(length), md5);
- } catch (...) {
- YQL_LOG(ERROR) << CurrentExceptionMessage();
- NFs::Remove(dstFile);
- throw;
+ NFs::HardLinkOrCopy(file, dstFile);
+ i64 length = GetFileLength(dstFile.c_str());
+ if (length == -1) {
+ ythrow TSystemError() << "cannot get file length: " << dstFile;
}
+ i64 srcLength = GetFileLength(file.c_str());
+ if (srcLength == -1) {
+ ythrow TSystemError() << "cannot get file length: " << file;
+ }
+
+ YQL_ENSURE(srcLength == length);
+ return std::make_pair(static_cast<ui64>(length), md5);
});
}
@@ -136,30 +106,23 @@ public:
strippedMeta = TUrlMeta();
const TString storageFileName = md5 + ".file.stripped";
TFileLinkPtr result = Storage.Put(storageFileName, "", "", [&file](const TFsPath& dstPath) {
- try {
- ui64 size;
- TString md5;
- TShellCommand cmd("strip", {file, "-o", dstPath.GetPath()});
- cmd.Run().Wait();
- if (*cmd.GetExitCode() != 0) {
- ythrow yexception() << cmd.GetError();
- }
- md5 = MD5::File(dstPath.GetPath());
- size = TFile(dstPath.GetPath(), OpenExisting | RdOnly).GetLength();
- YQL_LOG(DEBUG) << "Strip " << file << " to " << dstPath.GetPath();
- return std::make_pair(size, md5);
- } catch (...) {
- YQL_LOG(ERROR) << CurrentExceptionMessage();
- NFs::Remove(dstPath);
- throw;
+ ui64 size;
+ TString md5;
+ TShellCommand cmd("strip", {file, "-o", dstPath.GetPath()});
+ cmd.Run().Wait();
+ if (*cmd.GetExitCode() != 0) {
+ ythrow yexception() << cmd.GetError();
}
+ md5 = MD5::File(dstPath.GetPath());
+ size = TFile(dstPath.GetPath(), OpenExisting | RdOnly).GetLength();
+ YQL_LOG(DEBUG) << "Strip " << file << " to " << dstPath.GetPath();
+ return std::make_pair(size, md5);
});
strippedMeta.ContentFile = result->GetStorageFileName();
strippedMeta.Md5 = result->GetMd5();
auto metaTmpFile = Storage.GetTemp() / Storage.GetTempName();
strippedMeta.SaveTo(metaTmpFile);
- SetCacheFilePermissions(metaTmpFile);
Storage.MoveToStorage(metaTmpFile, strippedMetaFile);
return result;
}
@@ -170,26 +133,19 @@ public:
YQL_LOG(INFO) << "PutInline to cache. md5=" << md5;
auto lock = MultiResourceLock.Acquire(storageFileName);
return Storage.Put(storageFileName, TString(), md5, [&data, &md5](const TFsPath& dstFile) {
- try {
- TStringInput in(data);
- TFile outFile(dstFile, CreateAlways | ARW | AX);
- TUnbufferedFileOutput out(outFile);
- auto result = std::make_pair(TransferData(&in, &out), md5);
- outFile.Close();
- SetCacheFilePermissions(dstFile);
-
- i64 length = GetFileLength(dstFile.c_str());
- if (length == -1) {
- ythrow TSystemError() << "cannot get file length: " << dstFile;
- }
-
- YQL_ENSURE(data.size() == static_cast<ui64>(length));
- return result;
- } catch (...) {
- YQL_LOG(ERROR) << CurrentExceptionMessage();
- NFs::Remove(dstFile);
- throw;
+ TStringInput in(data);
+ TFile outFile(dstFile, CreateAlways | ARW | AX);
+ TUnbufferedFileOutput out(outFile);
+ auto result = std::make_pair(TransferData(&in, &out), md5);
+ outFile.Close();
+
+ i64 length = GetFileLength(dstFile.c_str());
+ if (length == -1) {
+ ythrow TSystemError() << "cannot get file length: " << dstFile;
}
+
+ YQL_ENSURE(data.size() == static_cast<ui64>(length));
+ return result;
});
}
@@ -202,18 +158,13 @@ public:
YQL_LOG(INFO) << "PutUrl to cache: " << convertedUrl;
THttpURL url = ParseURL(convertedUrl);
- auto rawScheme = url.GetField(NUri::TField::FieldScheme);
- if (NUri::EqualNoCase(rawScheme, "arc")) {
- return PutUrl(convertedUrl, url, oauthToken, true);
- } else {
- switch (url.GetSchemeInfo().Kind) {
- case NUri::TScheme::SchemeHTTP:
- case NUri::TScheme::SchemeHTTPS:
- return PutUrl(convertedUrl, url, oauthToken, false);
- default:
- ythrow yexception() << "Unsupported url scheme: " << convertedUrl;
+ for (const auto& d: Downloaders) {
+ if (d->Accept(url)) {
+ return PutUrl(url, oauthToken, d);
}
}
+
+ ythrow yexception() << "Unsupported url: " << convertedUrl;
} catch (const std::exception& e) {
YQL_LOG(ERROR) << "Failed to download file by URL \"" << urlStr << "\", details: " << e.what();
YQL_LOG_CTX_THROW yexception() << "FileStorage: Failed to download file by URL \"" << urlStr << "\", details: " << e.what();
@@ -249,6 +200,10 @@ public:
return Storage.GetTemp();
}
+ const TFileStorageConfig& GetConfig() const override {
+ return Config;
+ }
+
private:
void StartQueueOnce() {
// we shall call Start only once
@@ -257,26 +212,16 @@ private:
}
}
- TFileLinkPtr PutUrl(const TString& convertedUrl, const THttpURL& url, const TString& oauthToken, bool isArc) {
- if (isArc && !SupportArcLinks) {
- YQL_LOG(WARN) << "FileStorage: Arcadia support is disabled, reject downloading " << convertedUrl;
- throw yexception() << "It is not allowed to download Arcadia url " << convertedUrl;
- }
-
- if (!isArc && !AllowedUrls.IsEmpty() && !AllowedUrls.Match(convertedUrl)) {
- YQL_LOG(WARN) << "FileStorage: url " << convertedUrl << " is not in whitelist, reject downloading";
- throw yexception() << "It is not allowed to download url " << convertedUrl;
- }
-
+ TFileLinkPtr PutUrl(const THttpURL& url, const TString& oauthToken, const IDownloaderPtr& downloader) {
return WithRetry<TDownloadError>(Config.GetRetryCount(), [&, this]() {
- return this->DoPutUrl(convertedUrl, url, oauthToken, isArc);
+ return this->DoPutUrl(url, oauthToken, downloader);
}, [&](const auto& e, int attempt, int attemptCount) {
- YQL_LOG(WARN) << "Error while downloading url " << convertedUrl << ", attempt " << attempt << "/" << attemptCount << ", details: " << e.what();
+ YQL_LOG(WARN) << "Error while downloading url " << url.PrintS() << ", attempt " << attempt << "/" << attemptCount << ", details: " << e.what();
Sleep(TDuration::MilliSeconds(Config.GetRetryDelayMs()));
});
}
- TFileLinkPtr DoPutUrl(const TString& convertedUrl, const THttpURL& url, const TString& oauthToken, bool isArc) {
+ TFileLinkPtr DoPutUrl(const THttpURL& url, const TString& oauthToken, const IDownloaderPtr& downloader) {
const auto urlMetaFile = BuildUrlMetaFileName(url);
auto lock = MultiResourceLock.Acquire(urlMetaFile); // let's use meta file as lock name
@@ -296,61 +241,31 @@ private:
<< ", ContentFile=" << urlMeta.ContentFile << ", Md5=" << urlMeta.Md5
<< ", LastModified=" << urlMeta.LastModified;
- TFileLinkPtr result;
+ TStorage::TDataPuller puller;
TString etag;
TString lastModified;
- if (isArc) {
- const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified);
-
- result = Storage.Put(urlContentFile, TString(), TString(), [&convertedUrl, &url, this](const TFsPath& dstFile) {
- try {
- return this->FileExporter->ExportToFile(Config, convertedUrl, url, dstFile);
- } catch (...) {
- NFs::Remove(dstFile);
- throw;
- }
- });
-
- } else {
- TFetchResultPtr fr1 = FetchWithETagAndLastModified(convertedUrl, oauthToken, urlMeta.ETag, urlMeta.LastModified);
- switch (fr1->GetRetCode()) {
- case HTTP_NOT_MODIFIED:
- Y_ENSURE(oldContentLink); // should not fire
- return oldContentLink;
- case HTTP_OK:
- break;
- default:
- ythrow yexception() << "Url " << convertedUrl << " cannot be accessed, code: " << fr1->GetRetCode();
- }
-
- std::tie(etag, lastModified) = ExtractETagAndLastModified(*fr1);
- if (urlMeta.ETag && urlMeta.ETag == etag && oldContentLink) {
- // for non-empty etags server should reply with code 304 Not modified
- // but some servers like github.com do not support IfNoneMatch (but ETag is not empty)
- // no etag is supported (previously and now) or server responded with status code 200 and we have something in cache
- // access already checked by FetchWithETagAndLastModified
- return oldContentLink;
- }
-
- if (urlMeta.ETag && etag) {
- YQL_LOG(INFO) << "ETag for url " << convertedUrl << " has been changed from " << urlMeta.ETag << " to " << etag << ". We have to download new version";
- }
- else if (urlMeta.LastModified && lastModified) {
- YQL_LOG(INFO) << "LastModified for url " << convertedUrl << " has been changed from " << urlMeta.LastModified << " to " << lastModified << ". We have to download new version";
- }
-
- // todo: remove oldContentLink ?
- const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified);
-
- result = Storage.Put(urlContentFile, TString(), TString(), [&fr1, &convertedUrl](const TFsPath& dstFile) {
- try {
- return CopyToFile(convertedUrl, *fr1, dstFile);
- } catch (...) {
- NFs::Remove(dstFile);
- throw;
- }
- });
+ std::tie(puller, etag, lastModified) = downloader->Download(url, oauthToken, urlMeta.ETag, urlMeta.LastModified);
+ if (!puller) {
+ Y_ENSURE(oldContentLink); // should not fire
+ return oldContentLink;
+ }
+ if (urlMeta.ETag && urlMeta.ETag == etag && oldContentLink) {
+ // for non-empty etags server should reply with code 304 Not modified
+ // but some servers like github.com do not support IfNoneMatch (but ETag is not empty)
+ // no etag is supported (previously and now) or server responded with status code 200 and we have something in cache
+ // access already checked by FetchWithETagAndLastModified
+ return oldContentLink;
}
+ if (urlMeta.ETag && etag) {
+ YQL_LOG(INFO) << "ETag for url " << url.PrintS() << " has been changed from " << urlMeta.ETag << " to " << etag << ". We have to download new version";
+ }
+ else if (urlMeta.LastModified && lastModified) {
+ YQL_LOG(INFO) << "LastModified for url " << url.PrintS() << " has been changed from " << urlMeta.LastModified << " to " << lastModified << ". We have to download new version";
+ }
+
+ // todo: remove oldContentLink ?
+ const auto urlContentFile = BuildUrlContentFileName(url, etag, lastModified);
+ TFileLinkPtr result = Storage.Put(urlContentFile, TString(), TString(), puller);
// save meta using rename for atomicity
urlMeta.ETag = etag;
@@ -359,41 +274,10 @@ private:
urlMeta.Md5 = result->GetMd5();
auto metaTmpFile = Storage.GetTemp() / Storage.GetTempName();
urlMeta.SaveTo(metaTmpFile);
- SetCacheFilePermissions(metaTmpFile);
Storage.MoveToStorage(metaTmpFile, urlMetaFile);
return result;
-
- }
-
- TFetchResultPtr FetchWithETagAndLastModified(const TString& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified) {
- // more details about ETag and ModifiedSince: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26
- THttpHeaders headers;
- if (!oauthToken.empty()) {
- headers.AddHeader(THttpInputHeader("Authorization", "OAuth " + oauthToken));
- }
-
- // ETag has priority over modification time
- if (!oldEtag.empty()) {
- headers.AddHeader(THttpInputHeader("If-None-Match", oldEtag));
- } else if (!oldLastModified.empty()) {
- headers.AddHeader(THttpInputHeader("If-Modified-Since", oldLastModified));
- }
- const auto parsedUrl = ParseURL(url);
- if (parsedUrl.GetHost() == "yt.yandex-team.ru" || parsedUrl.GetHost().EndsWith(".yt.yandex-team.ru")) {
- auto guid = CreateGuidAsString();
- headers.AddHeader(THttpInputHeader("X-YT-Correlation-Id", guid));
- YQL_LOG(INFO) << "Use Correlation-Id=" << guid << " for " << url;
- }
-
- const auto timeout = TDuration::MilliSeconds(Config.GetSocketTimeoutMs());
- try {
- return Fetch(parsedUrl, headers, timeout);
- } catch (const std::exception& e) {
- // remap exception type to leverage retry logic
- throw TDownloadError() << e.what();
- }
}
static TString BuildUrlMetaFileName(const THttpURL& url) {
@@ -405,74 +289,14 @@ private:
return MD5::Calc(needle + url.PrintS(THttpURL::FlagNoFrag | THttpURL::FlagHostAscii)) + ".url";
}
- static std::pair<ui64, TString> CopyToFile(const TString& url, IFetchResult& src, const TString& dstFile) {
- TFile outFile(dstFile, CreateAlways | ARW | AX);
- TUnbufferedFileOutput out(outFile);
- TMd5OutputStream md5Out(out);
-
- THttpInput& httpStream = src.GetStream();
- TDownloadStream input(httpStream);
- const ui64 size = TransferData(&input, &md5Out);
- auto result = std::make_pair(size, md5Out.Finalize());
- out.Finish();
- outFile.Close();
- SetCacheFilePermissions(dstFile);
-
- ui64 contentLength = 0;
- // additional check for not compressed data
- if (!httpStream.ContentEncoded() && httpStream.GetContentLength(contentLength) && contentLength != size) {
- // let's retry this error
- ythrow TDownloadError() << "Size mismatch while downloading url " << url << ", downloaded size: " << size << ", ContentLength: " << contentLength;
- }
-
- if (auto trailers = httpStream.Trailers()) {
- if (auto header = trailers->FindHeader("X-YT-Error")) {
- ythrow TDownloadError() << "X-YT-Error=" << header->Value();
- }
- }
-
- i64 dstFileLen = GetFileLength(dstFile.c_str());
- if (dstFileLen == -1) {
- ythrow TSystemError() << "cannot get file length: " << dstFile;
- }
-
- YQL_ENSURE(static_cast<ui64>(dstFileLen) == size);
- return result;
- }
-
- static TString WeakETag2Strong(const TString& etag) {
- // drop W/ at the beginning if any
- return etag.StartsWith("W/") ? etag.substr(2) : etag;
- }
-
- static std::pair<TString, TString> ExtractETagAndLastModified(IFetchResult& result) {
- const auto& headers = result.GetStream().Headers();
- TString etag;
- TString lastModified;
- // linear scan
- for (auto it = headers.Begin(); it != headers.End(); ++it) {
- if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("ETag")))) {
- etag = WeakETag2Strong(it->Value());
- }
-
- if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("Last-Modified")))) {
- lastModified = it->Value();
- }
- }
-
- return std::make_pair(etag, lastModified);
- }
-
private:
TStorage Storage;
const TFileStorageConfig Config;
+ std::vector<IDownloaderPtr> Downloaders;
TUrlMapper Mapper;
- TPatternGroup AllowedUrls;
TAtomic QueueStarted;
- bool SupportArcLinks;
THolder<IThreadPool> MtpQueue;
TMultiResourceLock MultiResourceLock;
- std::unique_ptr<IFileExporter> FileExporter;
};
TFileStoragePtr CreateFileStorage(const TFileStorageConfig& params) {
diff --git a/ydb/library/yql/core/file_storage/file_storage.h b/ydb/library/yql/core/file_storage/file_storage.h
index 95f7203ee1..9133d57b1b 100644
--- a/ydb/library/yql/core/file_storage/file_storage.h
+++ b/ydb/library/yql/core/file_storage/file_storage.h
@@ -3,21 +3,33 @@
#include "storage.h"
#include <library/cpp/threading/future/future.h>
+#include <library/cpp/uri/http_url.h>
#include <util/folder/path.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
+#include <functional>
+#include <unordered_map>
+#include <tuple>
+
namespace NYql {
+class TFileStorageConfig;
+
struct IFileStorage: public TThrRefBase {
+ struct IDownloader : public TThrRefBase {
+ virtual bool Accept(const THttpURL& url) = 0;
+ virtual std::tuple<TStorage::TDataPuller, TString, TString> Download(const THttpURL& url, const TString& oauthToken, const TString& etag, const TString& lastModified) = 0;
+ };
+ using IDownloaderPtr = TIntrusivePtr<IDownloader>;
+
virtual ~IFileStorage() = default;
- virtual void AddAllowedUrlPattern(const TString& pattern) = 0;
+ virtual void AddDownloader(IDownloaderPtr downloader) = 0;
virtual TFileLinkPtr PutFile(const TString& file, const TString& outFileName = {}) = 0;
virtual TFileLinkPtr PutFileStripped(const TString& file, const TString& originalMd5 = {}) = 0;
virtual TFileLinkPtr PutInline(const TString& data) = 0;
virtual TFileLinkPtr PutUrl(const TString& url, const TString& oauthToken) = 0;
- virtual void SetExternalUser(bool isExternal) = 0;
// async versions
virtual NThreading::TFuture<TFileLinkPtr> PutFileAsync(const TString& file, const TString& outFileName = {}) = 0;
virtual NThreading::TFuture<TFileLinkPtr> PutInlineAsync(const TString& data) = 0;
@@ -25,12 +37,11 @@ struct IFileStorage: public TThrRefBase {
virtual TFsPath GetRoot() const = 0;
virtual TFsPath GetTemp() const = 0;
+ virtual const TFileStorageConfig& GetConfig() const = 0;
};
using TFileStoragePtr = TIntrusivePtr<IFileStorage>;
-class TFileStorageConfig;
-
// Will use auto-cleaned temporary directory if storagePath is empty
TFileStoragePtr CreateFileStorage(const TFileStorageConfig& params);
diff --git a/ydb/library/yql/core/file_storage/file_storage_ut.cpp b/ydb/library/yql/core/file_storage/file_storage_ut.cpp
index 1df31093e1..5103fdd31d 100644
--- a/ydb/library/yql/core/file_storage/file_storage_ut.cpp
+++ b/ydb/library/yql/core/file_storage/file_storage_ut.cpp
@@ -1,7 +1,8 @@
#include "file_storage.h"
-#include <ydb/library/yql/core/file_storage/ut/test_http_server.h>
+#include <ydb/library/yql/core/file_storage/ut/test_http_server.h>
#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/core/file_storage/http_download/http_download.h>
#include <library/cpp/threading/future/future.h>
#include <library/cpp/threading/future/async.h>
@@ -21,6 +22,12 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TIFStream(path).ReadAll();
}
+ static TFileStoragePtr CreateTestFS(const TFileStorageConfig& params = {}, const std::vector<TString>& extraPatterns = {}) {
+ TFileStoragePtr fs = CreateFileStorage(params);
+ fs->AddDownloader(MakeHttpDownloader(false, params, extraPatterns));
+ return fs;
+ }
+
static std::unique_ptr<TTestHttpServer> CreateTestHttpServer() {
TPortManager pm;
const ui16 port = pm.GetPort();
@@ -57,8 +64,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::Ok(currentContent);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
auto link1 = fs->PutUrl(url, {});
@@ -91,8 +97,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::Ok(content);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -121,8 +126,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -165,8 +169,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkLastModified(currentContent, currentLastModified);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -206,8 +209,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -250,8 +252,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, MakeWeakETag(currentETag));
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -287,8 +288,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::Ok(currentContent);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
auto link1 = fs->PutUrl(url, {});
@@ -318,8 +318,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
auto link1 = fs->PutUrl(url, {});
@@ -349,8 +348,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
auto link1 = fs->PutUrl(url, {});
@@ -371,8 +369,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
Y_UNIT_TEST(Md5ForPutFiles) {
TString currentContent = "ABC";
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
TTempFileHandle h1;
h1.Write("ABC", 3);
@@ -412,7 +409,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
TFileStorageConfig params;
params.SetRetryCount(3);
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS(params);
auto url = server->GetUrl();
@@ -432,8 +429,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
return TTestHttpServer::TReply::OkETag(currentContent, currentETag, 0);
});
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS();
auto url = server->GetUrl();
@@ -462,7 +458,7 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
TFileStorageConfig params;
params.SetRetryCount(3);
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS(params);
auto url = server->GetUrl();
@@ -480,26 +476,36 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
auto url = server->GetUrl();
- // not in whitelist
- TFileStorageConfig params1;
- params1.AddAllowedUrlPatterns("^XXXX$");
- TFileStoragePtr fs1 = CreateFileStorage(params1);
+ {
+ // not in whitelist
+ TFileStorageConfig params;
+ params.AddAllowedUrlPatterns("^XXXX$");
+ TFileStoragePtr fs = CreateTestFS(params);
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs1->PutUrl(url, {}), std::exception, "It is not allowed to download url http://localhost:");
- fs1->AddAllowedUrlPattern("^http://localhost:");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl(url, {}), std::exception, "It is not allowed to download url http://localhost:");
+ }
- auto link1 = fs1->PutUrl(url, {});
- UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link1->GetPath()));
+ {
+ // have in whitelist
+ TFileStorageConfig params;
+ params.SetSocketTimeoutMs(4000);
+ params.AddAllowedUrlPatterns("^http://localhost:");
+ TFileStoragePtr fs = CreateTestFS(params);
- // have in whitelist
- TFileStorageConfig params2;
- params2.SetSocketTimeoutMs(4000);
- params2.AddAllowedUrlPatterns("^http://localhost:");
- TFileStoragePtr fs2 = CreateFileStorage(params2);
- fs2->AddAllowedUrlPattern("^XXXX$");
+ auto link = fs->PutUrl(url, {});
+ UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link->GetPath()));
+ }
+
+ {
+ // have eaxtra url in whitelist
+ TFileStorageConfig params;
+ params.SetSocketTimeoutMs(4000);
+ params.AddAllowedUrlPatterns("^XXXX$");
+ TFileStoragePtr fs = CreateTestFS(params, std::vector{TString{"^http://localhost:"}});
- auto link2 = fs1->PutUrl(url, {});
- UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link2->GetPath()));
+ auto link = fs->PutUrl(url, {});
+ UNIT_ASSERT_VALUES_EQUAL(currentContent, ReadFileContent(link->GetPath()));
+ }
}
Y_UNIT_TEST(SocketTimeout) {
@@ -512,23 +518,9 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
TFileStorageConfig params;
params.SetSocketTimeoutMs(1000);
- TFileStoragePtr fs = CreateFileStorage(params);
+ TFileStoragePtr fs = CreateTestFS(params);
auto url = server->GetUrl();
UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl(url, {}), std::exception, "can not read from socket input stream");
}
-
-#ifndef OPENSOURCE
- Y_UNIT_TEST(ArcFileWithoutRev) {
- TFileStorageConfig params;
- TFileStoragePtr fs = CreateFileStorage(params);
-
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt", {}), std::exception, "Revision for Arcadia file must be specified");
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=", {}), std::exception, "Revision for Arcadia file must be specified");
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=0", {}), std::exception, "Revision for Arcadia file must be specified");
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=head", {}), std::exception, "Revision for Arcadia file must be specified");
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?rev=HEAD", {}), std::exception, "Revision for Arcadia file must be specified");
- UNIT_ASSERT_EXCEPTION_CONTAINS(fs->PutUrl("arc://yql/a.txt?op_rev=123", {}), std::exception, "Revision for Arcadia file must be specified");
- }
-#endif
}
diff --git a/ydb/library/yql/core/file_storage/http_download/http_download.cpp b/ydb/library/yql/core/file_storage/http_download/http_download.cpp
new file mode 100644
index 0000000000..0011d3b92a
--- /dev/null
+++ b/ydb/library/yql/core/file_storage/http_download/http_download.cpp
@@ -0,0 +1,172 @@
+#include "http_download.h"
+#include "pattern_group.h"
+
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/core/file_storage/download_stream.h>
+
+#include <ydb/library/yql/utils/fetch/fetch.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/log/context.h>
+#include <ydb/library/yql/utils/multi_resource_lock.h>
+#include <ydb/library/yql/utils/md5_stream.h>
+#include <ydb/library/yql/utils/retry.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/http/misc/httpcodes.h>
+
+#include <util/generic/guid.h>
+#include <util/generic/yexception.h>
+#include <util/stream/file.h>
+#include <util/system/file.h>
+
+
+namespace NYql {
+
+class THttpDownloader: public IFileStorage::IDownloader {
+public:
+ THttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls)
+ : SocketTimeoutMs(config.GetSocketTimeoutMs())
+ {
+ for (const auto& p : restictedUser ? config.GetExternalAllowedUrlPatterns() : config.GetAllowedUrlPatterns()) {
+ AllowedUrls.Add(p);
+ }
+ for (auto p: extraAllowedUrls) {
+ AllowedUrls.Add(p);
+ }
+ }
+ ~THttpDownloader() = default;
+
+ bool Accept(const THttpURL& url) final {
+ switch (url.GetScheme()) {
+ case NUri::TScheme::SchemeHTTP:
+ case NUri::TScheme::SchemeHTTPS:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ std::tuple<TStorage::TDataPuller, TString, TString> Download(const THttpURL& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified) final {
+ if (!AllowedUrls.IsEmpty()) {
+ if (auto strUrl = url.PrintS(); !AllowedUrls.Match(strUrl)) {
+ YQL_LOG(WARN) << "FileStorage: url " << strUrl << " is not in whitelist, reject downloading";
+ throw yexception() << "It is not allowed to download url " << strUrl;
+ }
+ }
+
+ TFetchResultPtr fr1 = FetchWithETagAndLastModified(url, oauthToken, oldEtag, oldLastModified, SocketTimeoutMs);
+ switch (fr1->GetRetCode()) {
+ case HTTP_NOT_MODIFIED:
+ return std::make_tuple(TStorage::TDataPuller{}, TString{}, TString{});
+ case HTTP_OK:
+ break;
+ default:
+ ythrow yexception() << "Url " << url.PrintS() << " cannot be accessed, code: " << fr1->GetRetCode();
+ }
+
+ auto pair = ExtractETagAndLastModified(*fr1);
+
+ auto puller = [urlStr = url.PrintS(), fr1](const TFsPath& dstPath) -> std::pair<ui64, TString> {
+ return CopyToFile(urlStr, *fr1, dstPath);
+ };
+
+ return std::make_tuple(puller, pair.first, pair.second);
+ }
+
+private:
+ static TFetchResultPtr FetchWithETagAndLastModified(const THttpURL& url, const TString& oauthToken, const TString& oldEtag, const TString& oldLastModified, ui32 socketTimeoutMs) {
+ // more details about ETag and ModifiedSince: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26
+ THttpHeaders headers;
+ if (!oauthToken.empty()) {
+ headers.AddHeader(THttpInputHeader("Authorization", "OAuth " + oauthToken));
+ }
+
+ // ETag has priority over modification time
+ if (!oldEtag.empty()) {
+ headers.AddHeader(THttpInputHeader("If-None-Match", oldEtag));
+ } else if (!oldLastModified.empty()) {
+ headers.AddHeader(THttpInputHeader("If-Modified-Since", oldLastModified));
+ }
+ if (url.GetHost() == "yt.yandex-team.ru" || url.GetHost().EndsWith(".yt.yandex-team.ru")) {
+ auto guid = CreateGuidAsString();
+ headers.AddHeader(THttpInputHeader("X-YT-Correlation-Id", guid));
+ YQL_LOG(INFO) << "Use Correlation-Id=" << guid << " for " << url.PrintS();
+ }
+
+ try {
+ return Fetch(url, headers, TDuration::MilliSeconds(socketTimeoutMs));
+ } catch (const std::exception& e) {
+ // remap exception type to leverage retry logic
+ throw TDownloadError() << e.what();
+ }
+ }
+
+ static std::pair<ui64, TString> CopyToFile(const TString& url, IFetchResult& src, const TString& dstFile) {
+ TFile outFile(dstFile, CreateAlways | ARW | AX);
+ TUnbufferedFileOutput out(outFile);
+ TMd5OutputStream md5Out(out);
+
+ THttpInput& httpStream = src.GetStream();
+ TDownloadStream input(httpStream);
+ const ui64 size = TransferData(&input, &md5Out);
+ auto result = std::make_pair(size, md5Out.Finalize());
+ out.Finish();
+ outFile.Close();
+
+ ui64 contentLength = 0;
+ // additional check for not compressed data
+ if (!httpStream.ContentEncoded() && httpStream.GetContentLength(contentLength) && contentLength != size) {
+ // let's retry this error
+ ythrow TDownloadError() << "Size mismatch while downloading url " << url << ", downloaded size: " << size << ", ContentLength: " << contentLength;
+ }
+
+ if (auto trailers = httpStream.Trailers()) {
+ if (auto header = trailers->FindHeader("X-YT-Error")) {
+ ythrow TDownloadError() << "X-YT-Error=" << header->Value();
+ }
+ }
+
+ i64 dstFileLen = GetFileLength(dstFile.c_str());
+ if (dstFileLen == -1) {
+ ythrow TSystemError() << "cannot get file length: " << dstFile;
+ }
+
+ YQL_ENSURE(static_cast<ui64>(dstFileLen) == size);
+ return result;
+ }
+
+ static TString WeakETag2Strong(const TString& etag) {
+ // drop W/ at the beginning if any
+ return etag.StartsWith("W/") ? etag.substr(2) : etag;
+ }
+
+ static std::pair<TString, TString> ExtractETagAndLastModified(IFetchResult& result) {
+ const auto& headers = result.GetStream().Headers();
+ TString etag;
+ TString lastModified;
+ // linear scan
+ for (auto it = headers.Begin(); it != headers.End(); ++it) {
+ if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("ETag")))) {
+ etag = WeakETag2Strong(it->Value());
+ }
+
+ if (TCIEqualTo<TString>()(it->Name(), TString(TStringBuf("Last-Modified")))) {
+ lastModified = it->Value();
+ }
+ }
+
+ return std::make_pair(etag, lastModified);
+ }
+
+private:
+ ui32 SocketTimeoutMs;
+ TPatternGroup AllowedUrls;
+};
+
+IFileStorage::IDownloaderPtr MakeHttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls) {
+ return MakeIntrusive<THttpDownloader>(restictedUser, config, extraAllowedUrls);
+}
+
+} // NYql
diff --git a/ydb/library/yql/core/file_storage/http_download/http_download.h b/ydb/library/yql/core/file_storage/http_download/http_download.h
new file mode 100644
index 0000000000..a0abc1c111
--- /dev/null
+++ b/ydb/library/yql/core/file_storage/http_download/http_download.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/library/yql/core/file_storage/file_storage.h>
+
+#include <vector>
+
+namespace NYql {
+
+class TFileStorageConfig;
+
+IFileStorage::IDownloaderPtr MakeHttpDownloader(bool restictedUser, const TFileStorageConfig& config, const std::vector<TString>& extraAllowedUrls);
+
+} // NYql
diff --git a/ydb/library/yql/core/file_storage/pattern_group.cpp b/ydb/library/yql/core/file_storage/http_download/pattern_group.cpp
index 689c97d322..f60ec107be 100644
--- a/ydb/library/yql/core/file_storage/pattern_group.cpp
+++ b/ydb/library/yql/core/file_storage/http_download/pattern_group.cpp
@@ -1,4 +1,4 @@
-#include "pattern_group.h"
+#include <ydb/library/yql/core/file_storage/http_download/pattern_group.h>
namespace NYql {
diff --git a/ydb/library/yql/core/file_storage/pattern_group.h b/ydb/library/yql/core/file_storage/http_download/pattern_group.h
index 53d0d64a7f..53d0d64a7f 100644
--- a/ydb/library/yql/core/file_storage/pattern_group.h
+++ b/ydb/library/yql/core/file_storage/http_download/pattern_group.h
diff --git a/ydb/library/yql/core/file_storage/pattern_group_ut.cpp b/ydb/library/yql/core/file_storage/http_download/pattern_group_ut.cpp
index 02ee126745..1b917252c1 100644
--- a/ydb/library/yql/core/file_storage/pattern_group_ut.cpp
+++ b/ydb/library/yql/core/file_storage/http_download/pattern_group_ut.cpp
@@ -1,4 +1,5 @@
#include "pattern_group.h"
+
#include <library/cpp/testing/unittest/registar.h>
using namespace NYql;
diff --git a/ydb/library/yql/core/file_storage/http_download/ut/ya.make b/ydb/library/yql/core/file_storage/http_download/ut/ya.make
new file mode 100644
index 0000000000..fec8e3e8c5
--- /dev/null
+++ b/ydb/library/yql/core/file_storage/http_download/ut/ya.make
@@ -0,0 +1,9 @@
+UNITTEST_FOR(ydb/library/yql/core/file_storage/http_download)
+
+OWNER(g:yql)
+
+SRCS(
+ pattern_group_ut.cpp
+)
+
+END()
diff --git a/ydb/library/yql/core/file_storage/http_download/ya.make b/ydb/library/yql/core/file_storage/http_download/ya.make
new file mode 100644
index 0000000000..344836558d
--- /dev/null
+++ b/ydb/library/yql/core/file_storage/http_download/ya.make
@@ -0,0 +1,25 @@
+LIBRARY()
+
+OWNER(g:yql)
+
+SRCS(
+ http_download.cpp
+ pattern_group.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/core/file_storage
+ ydb/library/yql/core/file_storage/proto
+ ydb/library/yql/utils/fetch
+ ydb/library/yql/utils/log
+ ydb/library/yql/utils
+ library/cpp/regex/pcre
+ library/cpp/digest/md5
+ library/cpp/http/misc
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/library/yql/core/file_storage/storage.cpp b/ydb/library/yql/core/file_storage/storage.cpp
index cae4682904..247bff5242 100644
--- a/ydb/library/yql/core/file_storage/storage.cpp
+++ b/ydb/library/yql/core/file_storage/storage.cpp
@@ -208,10 +208,18 @@ public:
ui64 fileSize = 0;
TString pullerMd5; // overrides input arg 'md5'
- std::tie(fileSize, pullerMd5) = puller(hardlinkFile);
+ try {
+ std::tie(fileSize, pullerMd5) = puller(hardlinkFile);
+ } catch (...) {
+ YQL_LOG(ERROR) << CurrentExceptionMessage();
+ NFs::Remove(hardlinkFile);
+ throw;
+ }
Y_ENSURE(hardlinkFile.Exists(), "FileStorage: cannot put not existing temporary path");
Y_ENSURE(hardlinkFile.IsFile(), "FileStorage: cannot put non-file temporary path");
+ SetCacheFilePermissionsNoThrow(hardlinkFile);
+
if (NFs::HardLink(hardlinkFile, storageFile)) {
AtomicIncrement(CurrentFiles);
AtomicAdd(CurrentSize, fileSize);
@@ -223,8 +231,6 @@ public:
result = MakeIntrusive<TFileLink>(hardlinkFile, storageFileName, fileSize, pullerMd5);
}
- SetCacheFilePermissionsNoThrow(result->GetPath().c_str());
-
YQL_LOG(INFO) << "Using " << (newFileAdded ? "new" : "existing") << " storage file " << result->GetStorageFileName().Quote()
<< ", temp path: " << result->GetPath().GetPath().Quote()
<< ", size: " << result->GetSize();
@@ -269,6 +275,7 @@ public:
if (!NFs::Rename(src, dstStorageFile)) {
ythrow yexception() << "Failed to rename file from " << src << " to " << dstStorageFile;
}
+ SetCacheFilePermissionsNoThrow(dstStorageFile);
const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str()));
diff --git a/ydb/library/yql/core/file_storage/ut/ya.make b/ydb/library/yql/core/file_storage/ut/ya.make
index 7c67ef85b9..c19f58203b 100644
--- a/ydb/library/yql/core/file_storage/ut/ya.make
+++ b/ydb/library/yql/core/file_storage/ut/ya.make
@@ -11,7 +11,6 @@ ENDIF()
SRCS(
file_storage_ut.cpp
- pattern_group_ut.cpp
sized_cache_ut.cpp
storage_ut.cpp
test_http_server.cpp
@@ -21,6 +20,7 @@ SRCS(
PEERDIR(
library/cpp/http/server
library/cpp/threading/future
+ ydb/library/yql/core/file_storage/http_download
)
END()
diff --git a/ydb/library/yql/core/file_storage/ya.make b/ydb/library/yql/core/file_storage/ya.make
index c1cd0f7a4c..f25592bfb7 100644
--- a/ydb/library/yql/core/file_storage/ya.make
+++ b/ydb/library/yql/core/file_storage/ya.make
@@ -7,8 +7,6 @@ SRCS(
download_stream.h
file_storage.cpp
file_storage.h
- pattern_group.cpp
- pattern_group.h
sized_cache.cpp
sized_cache.h
storage.cpp
@@ -20,29 +18,18 @@ SRCS(
)
PEERDIR(
- contrib/libs/grpc
library/cpp/cache
- library/cpp/cgiparam
library/cpp/digest/md5
library/cpp/logger/global
- library/cpp/regex/pcre
library/cpp/threading/future
+ library/cpp/regex/pcre
+ library/cpp/uri
ydb/library/yql/core/file_storage/proto
ydb/library/yql/utils
- ydb/library/yql/utils/fetch
ydb/library/yql/utils/log
+ ydb/library/yql/utils/fetch
)
-IF (NOT OPENSOURCE)
-PEERDIR(
- ydb/library/yql/core/file_storage/exporter
-)
-ELSE()
-PEERDIR(
- ydb/library/yql/core/file_storage/exporter_dummy
-)
-ENDIF()
-
END()
RECURSE_FOR_TESTS(