aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_preparer.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/operation_preparer.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/operation_preparer.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp881
1 files changed, 881 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
new file mode 100644
index 0000000000..e06fac4061
--- /dev/null
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -0,0 +1,881 @@
+#include "operation_preparer.h"
+
+#include "init.h"
+#include "file_writer.h"
+#include "operation.h"
+#include "operation_helpers.h"
+#include "operation_tracker.h"
+#include "transaction.h"
+#include "transaction_pinger.h"
+#include "yt_poller.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
+
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/digest/md5/md5.h>
+
+#include <util/folder/path.h>
+
+#include <util/string/builder.h>
+
+#include <util/system/execpath.h>
+
+namespace NYT::NDetail {
+
+using namespace NRawClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TWaitOperationStartPollerItem
+ : public IYtPollerItem
+{
+public:
+ TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
+ : OperationId_(operationId)
+ , Transaction_(std::move(transaction))
+ { }
+
+ void PrepareRequest(TRawBatchRequest* batchRequest) override
+ {
+ Future_ = batchRequest->GetOperation(
+ OperationId_,
+ TGetOperationOptions().AttributeFilter(
+ TOperationAttributeFilter().Add(EOperationAttribute::State)));
+ }
+
+ EStatus OnRequestExecuted() override
+ {
+ try {
+ auto attributes = Future_.GetValue();
+ Y_ENSURE(attributes.State.Defined());
+ bool operationHasLockedFiles =
+ *attributes.State != "starting" &&
+ *attributes.State != "pending" &&
+ *attributes.State != "orphaned" &&
+ *attributes.State != "waiting_for_agent" &&
+ *attributes.State != "initializing";
+ return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue;
+ } catch (const TErrorResponse& e) {
+ YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)",
+ e.GetError().GetMessage(),
+ e.GetRequestId());
+ return IsRetriable(e) ? PollContinue : PollBreak;
+ } catch (const std::exception& e) {
+ YT_LOG_ERROR("%v", e.what());
+ return PollBreak;
+ }
+ }
+
+ void OnItemDiscarded() override {
+ }
+
+private:
+ TOperationId OperationId_;
+ THolder<TPingableTransaction> Transaction_;
+ ::NThreading::TFuture<TOperationAttributes> Future_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TOperationForwardingRequestRetryPolicy
+ : public IRequestRetryPolicy
+{
+public:
+ TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation)
+ : Underlying_(underlying)
+ , Operation_(operation)
+ { }
+
+ void NotifyNewAttempt() override
+ {
+ Underlying_->NotifyNewAttempt();
+ }
+
+ TMaybe<TDuration> OnGenericError(const std::exception& e) override
+ {
+ UpdateOperationStatus(e.what());
+ return Underlying_->OnGenericError(e);
+ }
+
+ TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
+ {
+ auto msg = e.GetError().ShortDescription();
+ UpdateOperationStatus(msg);
+ return Underlying_->OnRetriableError(e);
+ }
+
+ void OnIgnoredError(const TErrorResponse& e) override
+ {
+ Underlying_->OnIgnoredError(e);
+ }
+
+ TString GetAttemptDescription() const override
+ {
+ return Underlying_->GetAttemptDescription();
+ }
+
+private:
+ void UpdateOperationStatus(TStringBuf err)
+ {
+ Y_VERIFY(Operation_);
+ Operation_->OnStatusUpdated(
+ ::TStringBuilder() << "Retriable error during operation start: " << err);
+ }
+
+private:
+ IRequestRetryPolicyPtr Underlying_;
+ TOperationPtr Operation_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
+ : Client_(std::move(client))
+ , TransactionId_(transactionId)
+ , FileTransaction_(MakeHolder<TPingableTransaction>(
+ Client_->GetRetryPolicy(),
+ Client_->GetContext(),
+ TransactionId_,
+ Client_->GetTransactionPinger()->GetChildTxPinger(),
+ TStartTransactionOptions()))
+ , ClientRetryPolicy_(Client_->GetRetryPolicy())
+ , PreparationId_(CreateGuidAsString())
+{ }
+
+const TClientContext& TOperationPreparer::GetContext() const
+{
+ return Client_->GetContext();
+}
+
+TTransactionId TOperationPreparer::GetTransactionId() const
+{
+ return TransactionId_;
+}
+
+TClientPtr TOperationPreparer::GetClient() const
+{
+ return Client_;
+}
+
+const TString& TOperationPreparer::GetPreparationId() const
+{
+ return PreparationId_;
+}
+
+const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
+{
+ return ClientRetryPolicy_;
+}
+
+TOperationId TOperationPreparer::StartOperation(
+ TOperation* operation,
+ const TString& operationType,
+ const TNode& spec,
+ bool useStartOperationRequest)
+{
+ CheckValidity();
+
+ THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
+ if (useStartOperationRequest) {
+ header.AddParameter("operation_type", operationType);
+ }
+ header.AddTransactionId(TransactionId_);
+ header.AddMutationId();
+
+ auto ysonSpec = NodeToYsonString(spec);
+ auto responseInfo = RetryRequestWithPolicy(
+ ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
+ ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
+ TOperationPtr(operation)),
+ GetContext(),
+ header,
+ ysonSpec);
+ TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
+ YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
+ operationId,
+ GetPreparationId());
+
+ YT_LOG_INFO("Operation %v started (%v): %v",
+ operationId,
+ operationType,
+ GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
+
+ TOperationExecutionTimeTracker::Get()->Start(operationId);
+
+ Client_->GetYtPoller().Watch(
+ new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_)));
+
+ return operationId;
+}
+
+void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
+{
+ CheckValidity();
+
+ TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
+ lockIdFutures.reserve(paths->size());
+ TRawBatchRequest lockRequest(GetContext().Config);
+ for (const auto& path : *paths) {
+ lockIdFutures.push_back(lockRequest.Lock(
+ FileTransaction_->GetId(),
+ path.Path_,
+ ELockMode::LM_SNAPSHOT,
+ TLockOptions().Waitable(true)));
+ }
+ ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
+
+ TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
+ nodeIdFutures.reserve(paths->size());
+ TRawBatchRequest getNodeIdRequest(GetContext().Config);
+ for (const auto& lockIdFuture : lockIdFutures) {
+ nodeIdFutures.push_back(getNodeIdRequest.Get(
+ FileTransaction_->GetId(),
+ ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
+ TGetOptions()));
+ }
+ ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
+
+ for (size_t i = 0; i != paths->size(); ++i) {
+ auto& richPath = (*paths)[i];
+ richPath.OriginalPath(richPath.Path_);
+ richPath.Path("#" + nodeIdFutures[i].GetValue().AsString());
+ YT_LOG_DEBUG("Locked file %v, new path is %v",
+ *richPath.OriginalPath_,
+ richPath.Path_);
+ }
+}
+
+void TOperationPreparer::CheckValidity() const
+{
+ Y_ENSURE(
+ FileTransaction_,
+ "File transaction is already moved, are you trying to use preparer for more than one operation?");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRetryPolicyIgnoringLockConflicts
+ : public TAttemptLimitedRetryPolicy
+{
+public:
+ using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy;
+ using TAttemptLimitedRetryPolicy::OnGenericError;
+
+ TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
+ {
+ if (IsAttemptLimitExceeded()) {
+ return Nothing();
+ }
+ if (e.IsConcurrentTransactionLockConflict()) {
+ return GetBackoffDuration(Config_);
+ }
+ return TAttemptLimitedRetryPolicy::OnRetriableError(e);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TFileToUpload
+ : public IItemToUpload
+{
+public:
+ TFileToUpload(TString fileName, TMaybe<TString> md5)
+ : FileName_(std::move(fileName))
+ , MD5_(std::move(md5))
+ { }
+
+ TString CalculateMD5() const override
+ {
+ if (MD5_) {
+ return *MD5_;
+ }
+ constexpr size_t md5Size = 32;
+ TString result;
+ result.ReserveAndResize(md5Size);
+ MD5::File(FileName_.data(), result.Detach());
+ MD5_ = result;
+ return result;
+ }
+
+ THolder<IInputStream> CreateInputStream() const override
+ {
+ return MakeHolder<TFileInput>(FileName_);
+ }
+
+ TString GetDescription() const override
+ {
+ return FileName_;
+ }
+
+ ui64 GetDataSize() const override
+ {
+ return GetFileLength(FileName_);
+ }
+
+private:
+ TString FileName_;
+ mutable TMaybe<TString> MD5_;
+};
+
+class TDataToUpload
+ : public IItemToUpload
+{
+public:
+ TDataToUpload(TString data, TString description)
+ : Data_(std::move(data))
+ , Description_(std::move(description))
+ { }
+
+ TString CalculateMD5() const override
+ {
+ constexpr size_t md5Size = 32;
+ TString result;
+ result.ReserveAndResize(md5Size);
+ MD5::Data(reinterpret_cast<const unsigned char*>(Data_.data()), Data_.size(), result.Detach());
+ return result;
+ }
+
+ THolder<IInputStream> CreateInputStream() const override
+ {
+ return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
+ }
+
+ TString GetDescription() const override
+ {
+ return Description_;
+ }
+
+ ui64 GetDataSize() const override
+ {
+ return Data_.size();
+ }
+
+private:
+ TString Data_;
+ TString Description_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+static const TString& GetPersistentExecPathMd5()
+{
+ static TString md5 = MD5::File(GetPersistentExecPath());
+ return md5;
+}
+
+static TMaybe<TSmallJobFile> GetJobState(const IJob& job)
+{
+ TString result;
+ {
+ TStringOutput output(result);
+ job.Save(output);
+ output.Finish();
+ }
+ if (result.empty()) {
+ return Nothing();
+ } else {
+ return TSmallJobFile{"jobstate", result};
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobPreparer::TJobPreparer(
+ TOperationPreparer& operationPreparer,
+ const TUserJobSpec& spec,
+ const IJob& job,
+ size_t outputTableCount,
+ const TVector<TSmallJobFile>& smallFileList,
+ const TOperationOptions& options)
+ : OperationPreparer_(operationPreparer)
+ , Spec_(spec)
+ , Options_(options)
+{
+
+ CreateStorage();
+ auto cypressFileList = CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
+
+ for (const auto& file : cypressFileList) {
+ UseFileInCypress(file);
+ }
+ for (const auto& localFile : spec.GetLocalFiles()) {
+ UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile));
+ }
+ auto jobStateSmallFile = GetJobState(job);
+ if (jobStateSmallFile) {
+ UploadSmallFile(*jobStateSmallFile);
+ }
+ for (const auto& smallFile : smallFileList) {
+ UploadSmallFile(smallFile);
+ }
+
+ if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
+ ClassName_ = TJobFactory::Get()->GetJobName(&job);
+ Command_ = commandJob->GetCommand();
+ } else {
+ PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined());
+ }
+
+ operationPreparer.LockFiles(&CachedFiles_);
+}
+
+TVector<TRichYPath> TJobPreparer::GetFiles() const
+{
+ TVector<TRichYPath> allFiles = CypressFiles_;
+ allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end());
+ return allFiles;
+}
+
+const TString& TJobPreparer::GetClassName() const
+{
+ return ClassName_;
+}
+
+const TString& TJobPreparer::GetCommand() const
+{
+ return Command_;
+}
+
+const TUserJobSpec& TJobPreparer::GetSpec() const
+{
+ return Spec_;
+}
+
+bool TJobPreparer::ShouldMountSandbox() const
+{
+ return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_;
+}
+
+ui64 TJobPreparer::GetTotalFileSize() const
+{
+ return TotalFileSize_;
+}
+
+TString TJobPreparer::GetFileStorage() const
+{
+ return Options_.FileStorage_ ?
+ *Options_.FileStorage_ :
+ OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory;
+}
+
+TYPath TJobPreparer::GetCachePath() const
+{
+ return AddPathPrefix(
+ ::TStringBuilder() << GetFileStorage() << "/new_cache",
+ OperationPreparer_.GetContext().Config->Prefix);
+}
+
+void TJobPreparer::CreateStorage() const
+{
+ Create(
+ OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ OperationPreparer_.GetContext(),
+ Options_.FileStorageTransactionId_,
+ GetCachePath(),
+ NT_MAP,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true));
+}
+
+int TJobPreparer::GetFileCacheReplicationFactor() const
+{
+ if (IsLocalMode()) {
+ return 1;
+ } else {
+ return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor;
+ }
+}
+
+void TJobPreparer::CreateFileInCypress(const TString& path) const
+{
+ auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor());
+ if (Options_.FileExpirationTimeout_) {
+ attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
+ }
+
+ Create(
+ OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ OperationPreparer_.GetContext(),
+ Options_.FileStorageTransactionId_,
+ path,
+ NT_FILE,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true)
+ .Attributes(attributes)
+ );
+}
+
+TString TJobPreparer::PutFileToCypressCache(
+ const TString& path,
+ const TString& md5Signature,
+ TTransactionId transactionId) const
+{
+ constexpr ui32 LockConflictRetryCount = 30;
+ auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
+ LockConflictRetryCount,
+ OperationPreparer_.GetContext().Config);
+
+ auto putFileToCacheOptions = TPutFileToCacheOptions();
+ if (Options_.FileExpirationTimeout_) {
+ putFileToCacheOptions.PreserveExpirationTimeout(true);
+ }
+
+ auto cachePath = PutFileToCache(
+ retryPolicy,
+ OperationPreparer_.GetContext(),
+ transactionId,
+ path,
+ md5Signature,
+ GetCachePath(),
+ putFileToCacheOptions);
+
+ Remove(
+ OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ OperationPreparer_.GetContext(),
+ transactionId,
+ path,
+ TRemoveOptions().Force(true));
+
+ return cachePath;
+}
+
+TMaybe<TString> TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const
+{
+ constexpr ui32 LockConflictRetryCount = 30;
+ auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
+ LockConflictRetryCount,
+ OperationPreparer_.GetContext().Config);
+ auto maybePath = GetFileFromCache(
+ retryPolicy,
+ OperationPreparer_.GetContext(),
+ TTransactionId(),
+ md5Signature,
+ GetCachePath(),
+ TGetFileFromCacheOptions());
+ if (maybePath) {
+ YT_LOG_DEBUG("File is already in cache (FileName: %v)",
+ fileName,
+ *maybePath);
+ }
+ return maybePath;
+}
+
+TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const
+{
+ const TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval +
+ TDuration::MilliSeconds(100);
+ const double dataSizeGb = static_cast<double>(itemToUpload.GetDataSize()) / 1_GB;
+ return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb;
+}
+
+TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const
+{
+ TString uniquePath = AddPathPrefix(
+ ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(),
+ OperationPreparer_.GetContext().Config->Prefix);
+ YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)",
+ itemToUpload.GetDescription(),
+ uniquePath,
+ OperationPreparer_.GetPreparationId());
+
+ CreateFileInCypress(uniquePath);
+
+ {
+ TFileWriter writer(
+ uniquePath,
+ OperationPreparer_.GetClientRetryPolicy(),
+ OperationPreparer_.GetClient()->GetTransactionPinger(),
+ OperationPreparer_.GetContext(),
+ Options_.FileStorageTransactionId_,
+ TFileWriterOptions().ComputeMD5(true));
+ itemToUpload.CreateInputStream()->ReadAll(writer);
+ writer.Finish();
+ }
+ return uniquePath;
+}
+
+TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const
+{
+ const auto md5Signature = itemToUpload.CalculateMD5();
+
+ auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature;
+ if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) {
+ fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName);
+ }
+ TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix);
+
+ CreateFileInCypress(cypressPath);
+
+ auto uploadTx = MakeIntrusive<TTransaction>(
+ OperationPreparer_.GetClient(),
+ OperationPreparer_.GetContext(),
+ TTransactionId(),
+ TStartTransactionOptions());
+
+ ILockPtr lock;
+ try {
+ lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true));
+ } catch (const TErrorResponse& e) {
+ if (e.IsResolveError()) {
+ // If the node doesn't exist, it must be removed by concurrent uploading process.
+ // Let's try to find it in the cache.
+ return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription());
+ }
+ throw;
+ }
+
+ auto waitTimeout = GetWaitForUploadTimeout(itemToUpload);
+ YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)",
+ itemToUpload.GetDescription(),
+ cypressPath,
+ waitTimeout);
+
+ if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) {
+ YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)",
+ itemToUpload.GetDescription(),
+ cypressPath);
+ return Nothing();
+ }
+
+ YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)",
+ itemToUpload.GetDescription(),
+ cypressPath);
+
+ // Ensure that this process is the first to take a lock.
+ if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
+ return *cachedItemPath;
+ }
+
+ YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)",
+ itemToUpload.GetDescription(),
+ cypressPath,
+ OperationPreparer_.GetPreparationId());
+
+ {
+ auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true));
+ YT_VERIFY(writer);
+ itemToUpload.CreateInputStream()->ReadAll(*writer);
+ writer->Finish();
+ }
+
+ auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId());
+
+ uploadTx->Commit();
+ return path;
+}
+
+TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const
+{
+ auto md5Signature = itemToUpload.CalculateMD5();
+ Y_VERIFY(md5Signature.size() == 32);
+
+ if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
+ return *cachedItemPath;
+ }
+
+ YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)",
+ itemToUpload.GetDescription(),
+ OperationPreparer_.GetPreparationId());
+
+ if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled) {
+ if (auto path = TryUploadWithDeduplication(itemToUpload)) {
+ return *path;
+ }
+ }
+
+ auto path = UploadToRandomPath(itemToUpload);
+ return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_);
+}
+
+TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
+{
+ YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)",
+ itemToUpload.GetDescription(),
+ OperationPreparer_.GetPreparationId());
+
+ TString result;
+ switch (Options_.FileCacheMode_) {
+ case TOperationOptions::EFileCacheMode::ApiCommandBased:
+ Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() <<
+ "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'");
+ result = UploadToCacheUsingApi(itemToUpload);
+ break;
+ case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload:
+ result = UploadToRandomPath(itemToUpload);
+ break;
+ default:
+ Y_FAIL("Unknown file cache mode: %d", static_cast<int>(Options_.FileCacheMode_));
+ }
+
+ YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)",
+ itemToUpload.GetDescription(),
+ OperationPreparer_.GetPreparationId());
+
+ return result;
+}
+
+void TJobPreparer::UseFileInCypress(const TRichYPath& file)
+{
+ if (!Exists(
+ OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ OperationPreparer_.GetContext(),
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_))
+ {
+ ythrow yexception() << "File " << file.Path_ << " does not exist";
+ }
+
+ if (ShouldMountSandbox()) {
+ auto size = Get(
+ OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ OperationPreparer_.GetContext(),
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_ + "/@uncompressed_data_size")
+ .AsInt64();
+
+ TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
+ }
+ CypressFiles_.push_back(file);
+}
+
+void TJobPreparer::UploadLocalFile(
+ const TLocalFilePath& localPath,
+ const TAddLocalFileOptions& options,
+ bool isApiFile)
+{
+ TFsPath fsPath(localPath);
+ fsPath.CheckExists();
+
+ TFileStat stat;
+ fsPath.Stat(stat);
+
+ bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH);
+ auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_));
+
+ TRichYPath cypressPath;
+ if (isApiFile) {
+ cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
+ }
+ cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename()));
+ if (isExecutable) {
+ cypressPath.Executable(true);
+ }
+ if (options.BypassArtifactCache_) {
+ cypressPath.BypassArtifactCache(*options.BypassArtifactCache_);
+ }
+
+ if (ShouldMountSandbox()) {
+ TotalFileSize_ += RoundUpFileSize(stat.Size);
+ }
+
+ CachedFiles_.push_back(cypressPath);
+}
+
+void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary)
+{
+ if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
+ auto binaryLocalPath = std::get<TJobBinaryLocalPath>(jobBinary);
+ auto opts = TAddLocalFileOptions().PathInJob("cppbinary");
+ if (binaryLocalPath.MD5CheckSum) {
+ opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum);
+ }
+ UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true);
+ } else if (std::holds_alternative<TJobBinaryCypressPath>(jobBinary)) {
+ auto binaryCypressPath = std::get<TJobBinaryCypressPath>(jobBinary);
+ TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
+ ytPath.Path(binaryCypressPath.Path);
+ if (binaryCypressPath.TransactionId) {
+ ytPath.TransactionId(*binaryCypressPath.TransactionId);
+ }
+ UseFileInCypress(ytPath.FileName("cppbinary").Executable(true));
+ } else {
+ Y_FAIL("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data());
+ }
+}
+
+void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
+{
+ auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]"));
+ auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
+ CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName));
+ if (ShouldMountSandbox()) {
+ TotalFileSize_ += RoundUpFileSize(smallFile.Data.size());
+ }
+}
+
+bool TJobPreparer::IsLocalMode() const
+{
+ return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
+}
+
+void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
+{
+ auto jobBinary = TJobBinaryConfig();
+ if (!std::holds_alternative<TJobBinaryDefault>(Spec_.GetJobBinary())) {
+ jobBinary = Spec_.GetJobBinary();
+ }
+ TString binaryPathInsideJob;
+ if (std::holds_alternative<TJobBinaryDefault>(jobBinary)) {
+ if (GetInitStatus() != EInitStatus::FullInitialization) {
+ ythrow yexception() << "NYT::Initialize() must be called prior to any operation";
+ }
+
+ const bool isLocalMode = IsLocalMode();
+ const TMaybe<TString> md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing();
+ jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5};
+
+ if (isLocalMode) {
+ binaryPathInsideJob = GetExecPath();
+ }
+ } else if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
+ const bool isLocalMode = IsLocalMode();
+ if (isLocalMode) {
+ binaryPathInsideJob = TFsPath(std::get<TJobBinaryLocalPath>(jobBinary).Path).RealPath();
+ }
+ }
+ Y_ASSERT(!std::holds_alternative<TJobBinaryDefault>(jobBinary));
+
+ // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed
+ if (!binaryPathInsideJob) {
+ binaryPathInsideJob = "./cppbinary";
+ UploadBinary(jobBinary);
+ }
+
+ TString jobCommandPrefix = Options_.JobCommandPrefix_;
+ if (!Spec_.JobCommandPrefix_.empty()) {
+ jobCommandPrefix = Spec_.JobCommandPrefix_;
+ }
+
+ TString jobCommandSuffix = Options_.JobCommandSuffix_;
+ if (!Spec_.JobCommandSuffix_.empty()) {
+ jobCommandSuffix = Spec_.JobCommandSuffix_;
+ }
+
+ ClassName_ = TJobFactory::Get()->GetJobName(&job);
+
+ auto jobArguments = TNode::CreateMap();
+ jobArguments["job_name"] = ClassName_;
+ jobArguments["output_table_count"] = static_cast<i64>(outputTableCount);
+ jobArguments["has_state"] = hasState;
+ Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments));
+
+ Command_ = ::TStringBuilder() <<
+ jobCommandPrefix <<
+ (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " <<
+ binaryPathInsideJob <<
+ jobCommandSuffix;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail