1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include "aws/s3/private/s3_parallel_input_stream.h"
#include <aws/common/file.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>
#include <errno.h>
void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_parallel_input_stream_vtable *vtable,
void *impl) {
AWS_ZERO_STRUCT(*stream);
stream->alloc = alloc;
stream->vtable = vtable;
stream->impl = impl;
aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
}
struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream) {
if (stream != NULL) {
aws_ref_count_acquire(&stream->ref_count);
}
return stream;
}
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream) {
if (stream != NULL) {
aws_ref_count_release(&stream->ref_count);
}
return NULL;
}
struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest) {
/* Ensure the buffer has space available */
if (dest->len == dest->capacity) {
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
return future;
}
struct aws_future_bool *future = stream->vtable->read(stream, offset, dest);
return future;
}
struct aws_parallel_input_stream_from_file_impl {
struct aws_parallel_input_stream base;
struct aws_string *file_path;
};
static void s_para_from_file_destroy(struct aws_parallel_input_stream *stream) {
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
aws_string_destroy(impl->file_path);
aws_mem_release(stream->alloc, impl);
}
struct aws_future_bool *s_para_from_file_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest) {
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
bool success = false;
struct aws_input_stream *file_stream = NULL;
struct aws_stream_status status = {
.is_end_of_stream = false,
.is_valid = true,
};
file_stream = aws_input_stream_new_from_file(stream->alloc, aws_string_c_str(impl->file_path));
if (!file_stream) {
goto done;
}
if (aws_input_stream_seek(file_stream, offset, AWS_SSB_BEGIN)) {
goto done;
}
/* Keep reading until fill the buffer.
* Note that we must read() after seek() to determine if we're EOF, the seek alone won't trigger it. */
while ((dest->len < dest->capacity) && !status.is_end_of_stream) {
/* Read from stream */
if (aws_input_stream_read(file_stream, dest) != AWS_OP_SUCCESS) {
goto done;
}
/* Check if stream is done */
if (aws_input_stream_get_status(file_stream, &status) != AWS_OP_SUCCESS) {
goto done;
}
}
success = true;
done:
if (success) {
aws_future_bool_set_result(future, status.is_end_of_stream);
} else {
aws_future_bool_set_error(future, aws_last_error());
}
aws_input_stream_release(file_stream);
return future;
}
static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_vtable = {
.destroy = s_para_from_file_destroy,
.read = s_para_from_file_read,
};
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name) {
struct aws_parallel_input_stream_from_file_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl));
aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
impl->file_path = aws_string_new_from_cursor(allocator, &file_name);
if (!aws_path_exists(impl->file_path)) {
/* If file path not exists, raise error from errno. */
aws_translate_and_raise_io_error(errno);
goto error;
}
return &impl->base;
error:
s_para_from_file_destroy(&impl->base);
return NULL;
}
|