aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.h
blob: 03b4971af9336895450781b87ee1dbccbc9942c6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once

#include "clickhouse_config.h"

#if USE_AWS_S3

#    include <Core/Types.h>

#    include <Compression/CompressionInfo.h>
#    include <Common/ZooKeeper/ZooKeeper.h>

#    include <Core/BackgroundSchedulePool.h>
#    include <Storages/IStorage.h>
#    include <Storages/S3Queue/S3QueueFilesMetadata.h>
#    include <Storages/S3Queue/S3QueueSettings.h>
#    include <Storages/S3Queue/S3QueueSource.h>
#    include <Storages/StorageS3Settings.h>

#    include <IO/CompressionMethod.h>
#    include <IO/S3/getObjectInfo.h>
#    include <Interpreters/Context.h>
#    include <Interpreters/threadPoolCallbackRunner.h>
#    include <Processors/Executors/PullingPipelineExecutor.h>
#    include <Processors/ISource.h>
#    include <Storages/Cache/SchemaCache.h>
#    include <Storages/StorageConfiguration.h>
#    include <Storages/StorageS3.h>
#    include <Poco/URI.h>
#    include <Common/logger_useful.h>

namespace Aws::S3
{
class Client;
}

namespace DB
{


class StorageS3Queue : public IStorage, WithContext
{
public:
    using Configuration = typename StorageS3::Configuration;

    StorageS3Queue(
        std::unique_ptr<S3QueueSettings> s3queue_settings_,
        const Configuration & configuration_,
        const StorageID & table_id_,
        const ColumnsDescription & columns_,
        const ConstraintsDescription & constraints_,
        const String & comment,
        ContextPtr context_,
        std::optional<FormatSettings> format_settings_,
        ASTPtr partition_by_ = nullptr);

    String getName() const override { return "S3Queue"; }

    Pipe read(
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams) override;

    SinkToStoragePtr write(
        const ASTPtr & query,
        const StorageMetadataPtr & metadata_snapshot,
        ContextPtr context,
        bool async_insert) override;

    void truncate(
        const ASTPtr & /*query*/,
        const StorageMetadataPtr & /*metadata_snapshot*/,
        ContextPtr /*local_context*/,
        TableExclusiveLockHolder &) override;

    NamesAndTypesList getVirtuals() const override;

    bool supportsPartitionBy() const override;

    const auto & getFormatName() const { return configuration.format; }

    const String & getZooKeeperPath() const { return zk_path; }

    zkutil::ZooKeeperPtr getZooKeeper() const;

private:
    const std::unique_ptr<S3QueueSettings> s3queue_settings;
    const S3QueueAction after_processing;

    std::shared_ptr<S3QueueFilesMetadata> files_metadata;
    Configuration configuration;
    NamesAndTypesList virtual_columns;
    UInt64 reschedule_processing_interval_ms;

    std::optional<FormatSettings> format_settings;
    ASTPtr partition_by;

    String zk_path;
    mutable zkutil::ZooKeeperPtr zk_client;
    mutable std::mutex zk_mutex;

    std::atomic<bool> mv_attached = false;
    std::atomic<bool> shutdown_called{false};
    Poco::Logger * log;

    bool supportsSubcolumns() const override;
    bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }

    void threadFunc();
    size_t getTableDependentCount() const;
    bool hasDependencies(const StorageID & table_id);

    void startup() override;
    void shutdown() override;
    void drop() override;

    struct TaskContext
    {
        BackgroundSchedulePool::TaskHolder holder;
        std::atomic<bool> stream_cancelled{false};
        explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
    };
    std::shared_ptr<TaskContext> task;

    bool supportsSubsetOfColumns(const ContextPtr & context_) const;

    const UInt32 zk_create_table_retries = 1000;
    bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
    void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);

    using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;

    std::shared_ptr<StorageS3QueueSource::IIterator>
    createFileIterator(ContextPtr local_context, ASTPtr query);

    void streamToViews();
    Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
};

}

#endif