1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Common/logger_useful.h>
namespace DB
{
FileLogDirectoryWatcher::FileLogDirectoryWatcher(const std::string & path_, StorageFileLog & storage_, ContextPtr context_)
: path(path_)
, storage(storage_)
, log(&Poco::Logger::get("FileLogDirectoryWatcher(" + path + ")"))
, dw(std::make_unique<DirectoryWatcherBase>(*this, path, context_))
{
}
FileLogDirectoryWatcher::Events FileLogDirectoryWatcher::getEventsAndReset()
{
std::lock_guard lock(mutex);
Events res;
res.swap(events);
return res;
}
FileLogDirectoryWatcher::Error FileLogDirectoryWatcher::getErrorAndReset()
{
std::lock_guard lock(mutex);
Error old_error = error;
error = {};
return old_error;
}
const std::string & FileLogDirectoryWatcher::getPath() const
{
return path;
}
void FileLogDirectoryWatcher::onItemAdded(DirectoryWatcherBase::DirectoryEvent ev)
{
std::lock_guard lock(mutex);
EventInfo info{ev.event, "onItemAdded"};
std::string event_path = ev.path;
if (auto it = events.find(event_path); it != events.end())
{
it->second.file_events.emplace_back(info);
}
else
{
events.emplace(event_path, FileEvents{.file_events = std::vector<EventInfo>{info}});
}
}
void FileLogDirectoryWatcher::onItemRemoved(DirectoryWatcherBase::DirectoryEvent ev)
{
std::lock_guard lock(mutex);
EventInfo info{ev.event, "onItemRemoved"};
std::string event_path = ev.path;
if (auto it = events.find(event_path); it != events.end())
{
it->second.file_events.emplace_back(info);
}
else
{
events.emplace(event_path, FileEvents{.file_events = std::vector<EventInfo>{info}});
}
}
/// Optimize for MODIFY event, during a streamToViews period, since the log files
/// are append only, there are may a lots of MODIFY events produced for one file.
/// For example, appending 10000 logs into one file will result in 10000 MODIFY event.
/// So, if we record all of these events, it will use a lot of memory, and then we
/// need to handle it one by one in StorageFileLog::updateFileInfos, this is unnecessary
/// because it is equal to just record and handle one MODIY event
void FileLogDirectoryWatcher::onItemModified(DirectoryWatcherBase::DirectoryEvent ev)
{
std::lock_guard lock(mutex);
auto event_path = ev.path;
EventInfo info{ev.event, "onItemModified"};
if (auto it = events.find(event_path); it != events.end())
{
/// Already have MODIFY event for this file
if (it->second.received_modification_event)
return;
else
{
it->second.received_modification_event = true;
it->second.file_events.emplace_back(info);
}
}
else
{
events.emplace(event_path, FileEvents{.received_modification_event = true, .file_events = std::vector<EventInfo>{info}});
}
}
void FileLogDirectoryWatcher::onItemMovedFrom(DirectoryWatcherBase::DirectoryEvent ev)
{
std::lock_guard lock(mutex);
EventInfo info{ev.event, "onItemMovedFrom"};
std::string event_path = ev.path;
if (auto it = events.find(event_path); it != events.end())
{
it->second.file_events.emplace_back(info);
}
else
{
events.emplace(event_path, FileEvents{.file_events = std::vector<EventInfo>{info}});
}
}
void FileLogDirectoryWatcher::onItemMovedTo(DirectoryWatcherBase::DirectoryEvent ev)
{
std::lock_guard lock(mutex);
EventInfo info{ev.event, "onItemMovedTo"};
std::string event_path = ev.path;
if (auto it = events.find(event_path); it != events.end())
{
it->second.file_events.emplace_back(info);
}
else
{
events.emplace(event_path, FileEvents{.file_events = std::vector<EventInfo>{info}});
}
}
void FileLogDirectoryWatcher::onError(Exception e)
{
std::lock_guard lock(mutex);
LOG_ERROR(log, "Error happened during watching directory: {}", error.error_msg);
error.has_error = true;
error.error_msg = e.message();
}
}
|