1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
#include <clickhouse_config.h>
#if USE_AWS_S3
# include <Poco/JSON/JSON.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/StorageS3.h>
namespace DB
{
namespace ErrorCodes
{
extern const int METADATA_MISMATCH;
}
S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
}
String S3QueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
json.set("mode", mode);
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
json.set("format_name", format_name);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}
void S3QueueTableMetadata::read(const String & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
after_processing = json->getValue<String>("after_processing");
mode = json->getValue<String>("mode");
s3queue_tracked_files_limit = json->getValue<UInt64>("s3queue_tracked_files_limit");
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
format_name = json->getValue<String>("format_name");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
{
S3QueueTableMetadata metadata;
metadata.read(metadata_str);
return metadata;
}
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
{
if (after_processing != from_zk.after_processing)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs "
"in action after processing. Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
if (mode != from_zk.mode)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in engine mode. "
"Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
if (s3queue_tracked_files_limit != from_zk.s3queue_tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_tracked_files_limit,
s3queue_tracked_files_limit);
if (s3queue_tracked_file_ttl_sec != from_zk.s3queue_tracked_file_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_tracked_file_ttl_sec,
s3queue_tracked_file_ttl_sec);
if (format_name != from_zk.format_name)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in format name. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.format_name,
format_name);
}
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
{
checkImmutableFieldsEquals(from_zk);
}
}
#endif
|