blob: 577c71b2227f5e02c07d4daffaff88907d14beb2 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#pragma once
#if USE_AWS_S3
# include <Core/UUID.h>
# include <Interpreters/Context.h>
# include <Storages/StorageS3Settings.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
class StorageS3Queue;
struct S3QueueSettings;
class S3QueueFilesMetadata
{
public:
struct TrackedCollectionItem
{
TrackedCollectionItem() = default;
TrackedCollectionItem(const String & file_path_, UInt64 timestamp_, UInt64 retries_count_, const String & last_exception_)
: file_path(file_path_), timestamp(timestamp_), retries_count(retries_count_), last_exception(last_exception_) {}
String file_path;
UInt64 timestamp = 0;
UInt64 retries_count = 0;
String last_exception;
};
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
void setFilesProcessing(const Strings & file_paths);
void setFileProcessed(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
S3FilesCollection getProcessedFailedAndProcessingFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock(zkutil::ZooKeeperPtr zookeeper);
struct S3QueueCollection
{
public:
virtual ~S3QueueCollection() = default;
virtual String toString() const;
S3FilesCollection getFileNames();
virtual void parse(const String & collection_str) = 0;
protected:
TrackedFiles files;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
};
struct S3QueueProcessedCollection : public S3QueueCollection
{
public:
S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & collection_str) override;
void add(const String & file_name);
private:
const UInt64 max_size;
const UInt64 max_age;
};
struct S3QueueFailedCollection : S3QueueCollection
{
public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & collection_str) override;
bool add(const String & file_name, const String & exception_message);
S3FilesCollection getFileNames();
private:
UInt64 max_retries_count;
};
struct S3QueueProcessingCollection
{
public:
S3QueueProcessingCollection() = default;
void parse(const String & collection_str);
void add(const Strings & file_names);
void remove(const String & file_name);
String toString() const;
const S3FilesCollection & getFileNames() const { return files; }
private:
S3FilesCollection files;
};
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
const UInt64 max_set_size;
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
const String zookeeper_processing_path;
const String zookeeper_processed_path;
const String zookeeper_failed_path;
const String zookeeper_lock_path;
mutable std::mutex mutex;
Poco::Logger * log;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
void removeProcessingFile(const String & file_path);
};
}
#endif
|