1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
#include <unistd.h>
#include <cerrno>
#include <cassert>
#include <sys/stat.h>
#include <Common/Throttler.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event WriteBufferFromFileDescriptorWrite;
extern const Event WriteBufferFromFileDescriptorWriteFailed;
extern const Event WriteBufferFromFileDescriptorWriteBytes;
extern const Event DiskWriteElapsedMicroseconds;
extern const Event FileSync;
extern const Event FileSyncElapsedMicroseconds;
extern const Event LocalWriteThrottlerBytes;
extern const Event LocalWriteThrottlerSleepMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric Write;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
extern const int CANNOT_FSYNC;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_TRUNCATE_FILE;
extern const int CANNOT_FSTAT;
}
void WriteBufferFromFileDescriptor::nextImpl()
{
if (!offset())
return;
Stopwatch watch;
size_t bytes_written = 0;
while (bytes_written != offset())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
}
if ((-1 == res || 0 == res) && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);
/// Don't use getFileName() here because this method can be called from destructor
String error_file_name = file_name;
if (error_file_name.empty())
error_file_name = "(fd = " + toString(fd) + ")";
throwFromErrnoWithPath("Cannot write to file " + error_file_name, error_file_name,
ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
}
if (res > 0)
{
bytes_written += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalWriteThrottlerBytes, ProfileEvents::LocalWriteThrottlerSleepMicroseconds);
}
}
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
}
/// NOTE: This class can be used as a very low-level building block, for example
/// in trace collector. In such places allocations of memory can be dangerous,
/// so don't allocate anything in this constructor.
WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
int fd_,
size_t buf_size,
char * existing_memory,
ThrottlerPtr throttler_,
size_t alignment,
std::string file_name_)
: WriteBufferFromFileBase(buf_size, existing_memory, alignment)
, fd(fd_)
, throttler(throttler_)
, file_name(std::move(file_name_))
{
}
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
finalize();
}
void WriteBufferFromFileDescriptor::finalizeImpl()
{
if (fd < 0)
{
assert(!offset() && "attempt to write after close");
return;
}
next();
}
void WriteBufferFromFileDescriptor::sync()
{
/// If buffer has pending data - write it.
next();
ProfileEvents::increment(ProfileEvents::FileSync);
Stopwatch watch;
/// Request OS to sync data with storage medium.
#if defined(OS_DARWIN)
int res = ::fsync(fd);
#else
int res = ::fdatasync(fd);
#endif
ProfileEvents::increment(ProfileEvents::FileSyncElapsedMicroseconds, watch.elapsedMicroseconds());
if (-1 == res)
throwFromErrnoWithPath("Cannot fsync " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC);
}
off_t WriteBufferFromFileDescriptor::seek(off_t offset, int whence) // NOLINT
{
off_t res = lseek(fd, offset, whence);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return res;
}
void WriteBufferFromFileDescriptor::truncate(off_t length) // NOLINT
{
int res = ftruncate(fd, length);
if (-1 == res)
throwFromErrnoWithPath("Cannot truncate file " + getFileName(), getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE);
}
off_t WriteBufferFromFileDescriptor::size() const
{
struct stat buf;
int res = fstat(fd, &buf);
if (-1 == res)
throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT);
return buf.st_size;
}
std::string WriteBufferFromFileDescriptor::getFileName() const
{
if (file_name.empty())
return "(fd = " + toString(fd) + ")";
return file_name;
}
}
|