1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#pragma once
#include "client.h"
#include "structured_table_formats.h"
#include <yt/cpp/mapreduce/interface/operation.h>
namespace NYT::NDetail {
////////////////////////////////////////////////////////////////////////////////
class TOperation;
class TOperationPreparer
: public TThrRefBase
{
public:
TOperationPreparer(TClientPtr client, TTransactionId transactionId);
const TClientContext& GetContext() const;
TTransactionId GetTransactionId() const;
ITransactionPingerPtr GetTransactionPinger() const;
TClientPtr GetClient() const;
const TString& GetPreparationId() const;
void LockFiles(TVector<TRichYPath>* paths);
TOperationId StartOperation(
TOperation* operation,
const TString& operationType,
const TNode& spec,
bool useStartOperationRequest = false);
const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
private:
TClientPtr Client_;
TTransactionId TransactionId_;
THolder<TPingableTransaction> FileTransaction_;
IClientRetryPolicyPtr ClientRetryPolicy_;
const TString PreparationId_;
private:
void CheckValidity() const;
};
using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;
////////////////////////////////////////////////////////////////////////////////
struct IItemToUpload
{
virtual ~IItemToUpload() = default;
virtual TString CalculateMD5() const = 0;
virtual THolder<IInputStream> CreateInputStream() const = 0;
virtual TString GetDescription() const = 0;
virtual ui64 GetDataSize() const = 0;
};
////////////////////////////////////////////////////////////////////////////////
class TJobPreparer
: private TNonCopyable
{
public:
TJobPreparer(
TOperationPreparer& operationPreparer,
const TUserJobSpec& spec,
const IJob& job,
size_t outputTableCount,
const TVector<TSmallJobFile>& smallFileList,
const TOperationOptions& options);
TVector<TRichYPath> GetFiles() const;
const TString& GetClassName() const;
const TString& GetCommand() const;
const TUserJobSpec& GetSpec() const;
bool ShouldMountSandbox() const;
ui64 GetTotalFileSize() const;
private:
TOperationPreparer& OperationPreparer_;
TUserJobSpec Spec_;
TOperationOptions Options_;
TVector<TRichYPath> CypressFiles_;
TVector<TRichYPath> CachedFiles_;
TString ClassName_;
TString Command_;
ui64 TotalFileSize_ = 0;
private:
TString GetFileStorage() const;
TYPath GetCachePath() const;
bool IsLocalMode() const;
int GetFileCacheReplicationFactor() const;
void CreateStorage() const;
void CreateFileInCypress(const TString& path) const;
TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;
TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
TString UploadToCache(const IItemToUpload& itemToUpload) const;
void UseFileInCypress(const TRichYPath& file);
void UploadLocalFile(
const TLocalFilePath& localPath,
const TAddLocalFileOptions& options,
bool isApiFile = false);
void UploadBinary(const TJobBinaryConfig& jobBinary);
void UploadSmallFile(const TSmallJobFile& smallFile);
void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
};
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
|