blob: 6378479761d0f259634303fa3de27a8f23f0a6b5 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <IO/ReadBufferFromFile.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace CurrentMetrics { class Increment; }
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class StorageDistributed;
class ActionBlocker;
class BackgroundSchedulePool;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class ISource;
/** Queue for async INSERT Into Distributed engine (insert_distributed_sync=0).
*
* Files are added from two places:
* - from filesystem at startup (StorageDistributed::startup())
* - on INSERT via DistributedSink
*
* Later, in background, those files will be send to the remote nodes.
*
* The behaviour of this queue can be configured via the following settings:
* - distributed_directory_monitor_batch_inserts
* - distributed_directory_monitor_split_batch_on_failure
* - distributed_directory_monitor_sleep_time_ms
* - distributed_directory_monitor_max_sleep_time_ms
* NOTE: It worth to rename the settings too
* ("directory_monitor" in settings looks too internal).
*/
class DistributedAsyncInsertDirectoryQueue
{
friend class DistributedAsyncInsertBatch;
public:
DistributedAsyncInsertDirectoryQueue(
StorageDistributed & storage_,
const DiskPtr & disk_,
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool);
~DistributedAsyncInsertDirectoryQueue();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
void updatePath(const std::string & new_relative_path);
void flushAllData();
void shutdownAndDropAllData();
void shutdownWithoutFlush();
static std::shared_ptr<ISource> createSourceFromFile(const String & file_name);
/// For scheduling via DistributedSink.
bool addFileAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
struct InternalStatus
{
std::exception_ptr last_exception;
std::chrono::system_clock::time_point last_exception_time;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
size_t broken_files_count = 0;
size_t broken_bytes_count = 0;
};
/// system.distribution_queue interface
struct Status : InternalStatus
{
std::string path;
bool is_blocked = false;
};
Status getStatus();
private:
void run();
bool hasPendingFiles() const;
void addFile(const std::string & file_path);
void initializeFilesFromDisk();
void processFiles();
void processFile(std::string & file_path);
void processFilesWithBatching();
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);
SyncGuardPtr getDirectorySyncGuard(const std::string & path);
std::string getLoggerName() const;
StorageDistributed & storage;
const ConnectionPoolPtr pool;
DiskPtr disk;
std::string relative_path;
std::string path;
std::string broken_relative_path;
std::string broken_path;
const bool should_batch_inserts = false;
const bool split_batch_on_failure = true;
const bool dir_fsync = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
/// This is pending data (due to some error) for should_batch_inserts==true
std::string current_batch_file_path;
/// This is pending data (due to some error) for should_batch_inserts==false
std::string current_file;
struct BatchHeader;
struct Batch;
std::mutex status_mutex;
InternalStatus status;
ConcurrentBoundedQueue<std::string> pending_files;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::mutex mutex;
Poco::Logger * log;
ActionBlocker & monitor_blocker;
BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_bytes;
CurrentMetrics::Increment metric_pending_files;
CurrentMetrics::Increment metric_broken_bytes;
CurrentMetrics::Increment metric_broken_files;
};
}
|