1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/io/stream.h>
#include <aws/common/file.h>
#include <aws/io/file_utils.h>
#include <errno.h>
int aws_input_stream_seek(struct aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis) {
AWS_ASSERT(stream && stream->vtable && stream->vtable->seek);
return stream->vtable->seek(stream, offset, basis);
}
int aws_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
AWS_ASSERT(stream && stream->vtable && stream->vtable->read);
AWS_ASSERT(dest);
AWS_ASSERT(dest->len <= dest->capacity);
/* Deal with this edge case here, instead of relying on every implementation to do it right. */
if (dest->capacity == dest->len) {
return AWS_OP_SUCCESS;
}
/* Prevent implementations from accidentally overwriting existing data in the buffer.
* Hand them a "safe" buffer that starts where the existing data ends. */
const void *safe_buf_start = dest->buffer + dest->len;
const size_t safe_buf_capacity = dest->capacity - dest->len;
struct aws_byte_buf safe_buf = aws_byte_buf_from_empty_array(safe_buf_start, safe_buf_capacity);
int read_result = stream->vtable->read(stream, &safe_buf);
/* Ensure the implementation did not commit forbidden acts upon the buffer */
AWS_FATAL_ASSERT(
(safe_buf.buffer == safe_buf_start) && (safe_buf.capacity == safe_buf_capacity) &&
(safe_buf.len <= safe_buf_capacity));
if (read_result == AWS_OP_SUCCESS) {
/* Update the actual buffer */
dest->len += safe_buf.len;
}
return read_result;
}
int aws_input_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
AWS_ASSERT(stream && stream->vtable && stream->vtable->get_status);
return stream->vtable->get_status(stream, status);
}
int aws_input_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
AWS_ASSERT(stream && stream->vtable && stream->vtable->get_length);
return stream->vtable->get_length(stream, out_length);
}
/*
* cursor stream implementation
*/
struct aws_input_stream_byte_cursor_impl {
struct aws_input_stream base;
struct aws_allocator *allocator;
struct aws_byte_cursor original_cursor;
struct aws_byte_cursor current_cursor;
};
/*
* This is an ugly function that, in the absence of better guidance, is designed to handle all possible combinations of
* size_t (uint32_t, uint64_t). If size_t ever exceeds 64 bits this function will fail badly.
*
* Safety and invariant assumptions are sprinkled via comments. The overall strategy is to cast up to 64 bits and
* perform all arithmetic there, being careful with signed vs. unsigned to prevent bad operations.
*
* Assumption #1: size_t resolves to an unsigned integer 64 bits or smaller
*/
AWS_STATIC_ASSERT(sizeof(size_t) <= 8);
static int s_aws_input_stream_byte_cursor_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
struct aws_input_stream_byte_cursor_impl *impl =
AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
uint64_t final_offset = 0;
switch (basis) {
case AWS_SSB_BEGIN:
/*
* (uint64_t)offset -- safe by virtue of the earlier is-negative check
* (uint64_t)impl->original_cursor.len -- safe via assumption 1
*/
if (offset < 0 || (uint64_t)offset > (uint64_t)impl->original_cursor.len) {
return aws_raise_error(AWS_IO_STREAM_INVALID_SEEK_POSITION);
}
/* safe because negative offsets were turned into an error */
final_offset = (uint64_t)offset;
break;
case AWS_SSB_END:
/*
* -offset -- safe as long offset is not INT64_MIN which was previously checked
* (uint64_t)(-offset) -- safe because (-offset) is positive (and < INT64_MAX < UINT64_MAX)
*/
if (offset > 0 || offset == INT64_MIN || (uint64_t)(-offset) > (uint64_t)impl->original_cursor.len) {
return aws_raise_error(AWS_IO_STREAM_INVALID_SEEK_POSITION);
}
/* cases that would make this unsafe became errors with previous conditional */
final_offset = (uint64_t)impl->original_cursor.len - (uint64_t)(-offset);
break;
default:
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
/* true because we already validated against (impl->original_cursor.len) which is <= SIZE_MAX */
AWS_ASSERT(final_offset <= SIZE_MAX);
/* safe via previous assert */
size_t final_offset_sz = (size_t)final_offset;
/* sanity */
AWS_ASSERT(final_offset_sz <= impl->original_cursor.len);
/* reset current_cursor to new position */
impl->current_cursor = impl->original_cursor;
impl->current_cursor.ptr += final_offset_sz;
impl->current_cursor.len -= final_offset_sz;
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_byte_cursor_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_input_stream_byte_cursor_impl *impl =
AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
size_t actually_read = dest->capacity - dest->len;
if (actually_read > impl->current_cursor.len) {
actually_read = impl->current_cursor.len;
}
if (!aws_byte_buf_write(dest, impl->current_cursor.ptr, actually_read)) {
return aws_raise_error(AWS_IO_STREAM_READ_FAILED);
}
aws_byte_cursor_advance(&impl->current_cursor, actually_read);
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_byte_cursor_get_status(
struct aws_input_stream *stream,
struct aws_stream_status *status) {
struct aws_input_stream_byte_cursor_impl *impl =
AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
status->is_end_of_stream = impl->current_cursor.len == 0;
status->is_valid = true;
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_byte_cursor_get_length(struct aws_input_stream *stream, int64_t *out_length) {
struct aws_input_stream_byte_cursor_impl *impl =
AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
#if SIZE_MAX > INT64_MAX
size_t length = impl->original_cursor.len;
if (length > INT64_MAX) {
return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
}
#endif
*out_length = (int64_t)impl->original_cursor.len;
return AWS_OP_SUCCESS;
}
static void s_aws_input_stream_byte_cursor_destroy(struct aws_input_stream_byte_cursor_impl *impl) {
aws_mem_release(impl->allocator, impl);
}
static struct aws_input_stream_vtable s_aws_input_stream_byte_cursor_vtable = {
.seek = s_aws_input_stream_byte_cursor_seek,
.read = s_aws_input_stream_byte_cursor_read,
.get_status = s_aws_input_stream_byte_cursor_get_status,
.get_length = s_aws_input_stream_byte_cursor_get_length,
};
struct aws_input_stream *aws_input_stream_new_from_cursor(
struct aws_allocator *allocator,
const struct aws_byte_cursor *cursor) {
struct aws_input_stream_byte_cursor_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_byte_cursor_impl));
impl->allocator = allocator;
impl->original_cursor = *cursor;
impl->current_cursor = *cursor;
impl->base.vtable = &s_aws_input_stream_byte_cursor_vtable;
aws_ref_count_init(
&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_byte_cursor_destroy);
return &impl->base;
}
/*
* file-based input stream
*/
struct aws_input_stream_file_impl {
struct aws_input_stream base;
struct aws_allocator *allocator;
FILE *file;
bool close_on_clean_up;
};
static int s_aws_input_stream_file_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
int whence = (basis == AWS_SSB_BEGIN) ? SEEK_SET : SEEK_END;
if (aws_fseek(impl->file, offset, whence)) {
return AWS_OP_ERR;
}
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
size_t max_read = dest->capacity - dest->len;
size_t actually_read = fread(dest->buffer + dest->len, 1, max_read, impl->file);
if (actually_read == 0) {
if (ferror(impl->file)) {
return aws_raise_error(AWS_IO_STREAM_READ_FAILED);
}
}
dest->len += actually_read;
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_file_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
status->is_end_of_stream = feof(impl->file) != 0;
status->is_valid = ferror(impl->file) == 0;
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_file_get_length(struct aws_input_stream *stream, int64_t *length) {
struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
return aws_file_get_length(impl->file, length);
}
static void s_aws_input_stream_file_destroy(struct aws_input_stream_file_impl *impl) {
if (impl->close_on_clean_up && impl->file) {
fclose(impl->file);
}
aws_mem_release(impl->allocator, impl);
}
static struct aws_input_stream_vtable s_aws_input_stream_file_vtable = {
.seek = s_aws_input_stream_file_seek,
.read = s_aws_input_stream_file_read,
.get_status = s_aws_input_stream_file_get_status,
.get_length = s_aws_input_stream_file_get_length,
};
struct aws_input_stream *aws_input_stream_new_from_file(struct aws_allocator *allocator, const char *file_name) {
struct aws_input_stream_file_impl *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_file_impl));
impl->file = aws_fopen(file_name, "r+b");
if (impl->file == NULL) {
goto on_error;
}
impl->close_on_clean_up = true;
impl->allocator = allocator;
impl->base.vtable = &s_aws_input_stream_file_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);
return &impl->base;
on_error:
aws_mem_release(allocator, impl);
return NULL;
}
struct aws_input_stream *aws_input_stream_new_from_open_file(struct aws_allocator *allocator, FILE *file) {
struct aws_input_stream_file_impl *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_file_impl));
impl->file = file;
impl->close_on_clean_up = false;
impl->allocator = allocator;
impl->base.vtable = &s_aws_input_stream_file_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);
return &impl->base;
}
struct aws_input_stream *aws_input_stream_acquire(struct aws_input_stream *stream) {
if (stream != NULL) {
if (stream->vtable->acquire) {
stream->vtable->acquire(stream);
} else {
aws_ref_count_acquire(&stream->ref_count);
}
}
return stream;
}
struct aws_input_stream *aws_input_stream_release(struct aws_input_stream *stream) {
if (stream != NULL) {
if (stream->vtable->release) {
stream->vtable->release(stream);
} else {
aws_ref_count_release(&stream->ref_count);
}
}
return NULL;
}
void aws_input_stream_destroy(struct aws_input_stream *stream) {
aws_input_stream_release(stream);
}
|