1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
#pragma once
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
namespace zkutil
{
class ZooKeeper;
}
namespace Poco
{
class Logger;
namespace Util { class AbstractConfiguration; }
}
namespace Coordination
{
struct Stat;
}
namespace zkutil
{
class ZooKeeperLock;
}
namespace DB
{
class ASTAlterQuery;
struct DDLLogEntry;
struct DDLTaskBase;
using DDLTaskPtr = std::unique_ptr<DDLTaskBase>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class AccessRightsElements;
class DDLWorker
{
public:
DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
virtual String enqueueQuery(DDLLogEntry & entry);
/// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config
std::string getCommonHostID() const
{
return host_fqdn_id;
}
std::string getQueueDir() const
{
return queue_dir;
}
void startup();
virtual void shutdown();
bool isCurrentlyActive() const { return initialized && !stop_flag; }
/// Returns cached ZooKeeper session (possibly expired).
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
protected:
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks(bool reinitialized);
DDLTaskBase & saveTask(DDLTaskPtr && task);
/// Reads entry and check that the host belongs to host list of the task
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper);
void updateMaxDDLEntryID(const String & entry_name);
/// Check that query should be executed on leader replica only
static bool taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, StoragePtr storage);
/// Executes query only on leader replica in case of replicated table.
/// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard.
/// Most of these queries can be executed on non-leader replica, but actually they still send
/// query via RemoteQueryExecutor to leader, so to avoid such "2-phase" query execution we
/// execute query directly on leader.
bool tryExecuteQueryOnLeaderReplica(
DDLTaskBase & task,
StoragePtr storage,
const String & node_path,
const ZooKeeperPtr & zookeeper,
std::unique_ptr<zkutil::ZooKeeperLock> & execute_on_leader_lock);
bool tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeeper);
/// Checks and cleanups queue's nodes
void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper);
virtual bool canRemoveQueueEntry(const String & entry_name, const Coordination::Stat & stat);
/// Init task node
void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper);
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
void runMainThread();
void runCleanupThread();
ContextMutablePtr context;
Poco::Logger * log;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
/// Save state of executed task to avoid duplicate execution on ZK error
std::optional<String> last_skipped_entry_name;
std::optional<String> first_failed_task_name;
std::list<DDLTaskPtr> current_tasks;
/// This flag is needed for debug assertions only
bool queue_fully_loaded_after_initialization_debug_helper = false;
Coordination::Stat queue_node_stat;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
std::atomic<bool> initialized = false;
std::atomic<bool> stop_flag = true;
std::unique_ptr<ThreadFromGlobalPool> main_thread;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
/// Size of the pool for query execution.
size_t pool_size = 1;
std::unique_ptr<ThreadPool> worker_pool;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
std::atomic<UInt32> max_id = 0;
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
};
}
|