aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/WriteBufferFromS3.h
blob: 0fdf771e1f548bd5caa2c63993b6e5410f553b82 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#pragma once

#include "clickhouse_config.h"

#if USE_AWS_S3

#include <base/types.h>
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>

#include <memory>
#include <vector>
#include <list>

namespace DB
{
/**
 * Buffer to write a data to a S3 object with specified bucket and key.
 * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
 * In another case multipart upload is used:
 * Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
 * Each chunk is written as a part to S3.
 */
class WriteBufferFromS3 final : public WriteBufferFromFileBase
{
public:
    WriteBufferFromS3(
        std::shared_ptr<const S3::Client> client_ptr_,
        /// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads
        std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
        const String & bucket_,
        const String & key_,
        size_t buf_size_,
        const S3Settings::RequestSettings & request_settings_,
        std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
        ThreadPoolCallbackRunner<void> schedule_ = {},
        const WriteSettings & write_settings_ = {});

    ~WriteBufferFromS3() override;
    void nextImpl() override;
    void preFinalize() override;
    std::string getFileName() const override { return key; }
    void sync() override { next(); }

    class IBufferAllocationPolicy
    {
    public:
        virtual size_t getBufferNumber() const = 0;
        virtual size_t getBufferSize() const = 0;
        virtual void nextBuffer() = 0;
        virtual ~IBufferAllocationPolicy() = 0;
    };
    using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;

    static IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_);

private:
    /// Receives response from the server after sending all data.
    void finalizeImpl() override;

    String getVerboseLogDetails() const;
    String getShortLogDetails() const;

    struct PartData
    {
        Memory<> memory;
        size_t data_size = 0;

        std::shared_ptr<std::iostream> createAwsBuffer();

        bool isEmpty() const
        {
            return data_size == 0;
        }
    };

    void hidePartialData();
    void allocateFirstBuffer();
    void reallocateFirstBuffer();
    void detachBuffer();
    void allocateBuffer();
    void setFakeBufferWhenPreFinalized();

    S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
    void writePart(PartData && data);
    void writeMultipartUpload();
    void createMultipartUpload();
    void completeMultipartUpload();
    void abortMultipartUpload();
    void tryToAbortMultipartUpload();

    S3::PutObjectRequest getPutRequest(PartData & data);
    void makeSinglepartUpload(PartData && data);

    const String bucket;
    const String key;
    const S3Settings::RequestSettings request_settings;
    const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
    const WriteSettings write_settings;
    const std::shared_ptr<const S3::Client> client_ptr;
    const std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
    const std::optional<std::map<String, String>> object_metadata;
    Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
    LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);

    IBufferAllocationPolicyPtr buffer_allocation_policy;

    /// Upload in S3 is made in parts.
    /// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
    String multipart_upload_id;
    std::deque<String> multipart_tags;
    bool multipart_upload_finished = false;

    /// Track that prefinalize() is called only once
    bool is_prefinalized = false;

    /// First fully filled buffer has to be delayed
    /// There are two ways after:
    /// First is to call prefinalize/finalize, which leads to single part upload
    /// Second is to write more data, which leads to multi part upload
    std::deque<PartData> detached_part_data;
    char fake_buffer_when_prefinalized[1] = {};

    /// offset() and count() are unstable inside nextImpl
    /// For example nextImpl changes position hence offset() and count() is changed
    /// This vars are dedicated to store information about sizes when offset() and count() are unstable
    size_t total_size = 0;
    size_t hidden_size = 0;

    class TaskTracker;
    std::unique_ptr<TaskTracker> task_tracker;
};

}

#endif