aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation_preparer.h
blob: 7ced54e3b54003940d06e41179e00083ee3a6c83 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#pragma once

#include "client.h"
#include "structured_table_formats.h"

#include <yt/cpp/mapreduce/interface/operation.h>

namespace NYT::NDetail {

////////////////////////////////////////////////////////////////////////////////

class TOperation;

class TOperationPreparer
    : public TThrRefBase
{
public:
    TOperationPreparer(TClientPtr client, TTransactionId transactionId);

    const TClientContext& GetContext() const;
    TTransactionId GetTransactionId() const;
    ITransactionPingerPtr GetTransactionPinger() const;
    TClientPtr GetClient() const;

    const TString& GetPreparationId() const;

    void LockFiles(TVector<TRichYPath>* paths);

    TOperationId StartOperation(
        TOperation* operation,
        const TString& operationType,
        const TNode& spec,
        bool useStartOperationRequest = false);

    const IClientRetryPolicyPtr& GetClientRetryPolicy() const;

private:
    TClientPtr Client_;
    TTransactionId TransactionId_;
    THolder<TPingableTransaction> FileTransaction_;
    IClientRetryPolicyPtr ClientRetryPolicy_;
    const TString PreparationId_;

private:
    void CheckValidity() const;
};

using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;

////////////////////////////////////////////////////////////////////////////////

struct IItemToUpload
{
    virtual ~IItemToUpload() = default;

    virtual TString CalculateMD5() const = 0;
    virtual THolder<IInputStream> CreateInputStream() const = 0;
    virtual TString GetDescription() const = 0;
    virtual ui64 GetDataSize() const = 0;
};

////////////////////////////////////////////////////////////////////////////////

class TJobPreparer
    : private TNonCopyable
{
public:
    TJobPreparer(
        TOperationPreparer& operationPreparer,
        const TUserJobSpec& spec,
        const IJob& job,
        size_t outputTableCount,
        const TVector<TSmallJobFile>& smallFileList,
        const TOperationOptions& options);

    TVector<TRichYPath> GetFiles() const;
    const TString& GetClassName() const;
    const TString& GetCommand() const;
    const TUserJobSpec& GetSpec() const;
    bool ShouldMountSandbox() const;
    ui64 GetTotalFileSize() const;

private:
    TOperationPreparer& OperationPreparer_;
    TUserJobSpec Spec_;
    TOperationOptions Options_;

    TVector<TRichYPath> CypressFiles_;
    TVector<TRichYPath> CachedFiles_;

    TString ClassName_;
    TString Command_;
    ui64 TotalFileSize_ = 0;

private:
    TString GetFileStorage() const;
    TYPath GetCachePath() const;

    bool IsLocalMode() const;
    int GetFileCacheReplicationFactor() const;

    void CreateStorage() const;

    void CreateFileInCypress(const TString& path) const;
    TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
    TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;

    TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
    TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
    TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
    TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
    TString UploadToCache(const IItemToUpload& itemToUpload) const;

    void UseFileInCypress(const TRichYPath& file);

    void UploadLocalFile(
        const TLocalFilePath& localPath,
        const TAddLocalFileOptions& options,
        bool isApiFile = false);

    void UploadBinary(const TJobBinaryConfig& jobBinary);
    void UploadSmallFile(const TSmallJobFile& smallFile);

    void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NDetail