1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include "aws/s3/private/s3_checksums.h"
#include <aws/common/encoding.h>
#include <aws/io/stream.h>
struct aws_checksum_stream {
struct aws_input_stream base;
struct aws_allocator *allocator;
struct aws_input_stream *old_stream;
struct aws_s3_checksum *checksum;
struct aws_byte_buf checksum_result;
/* base64 encoded checksum of the stream, updated on destruction of stream */
struct aws_byte_buf *encoded_checksum_output;
};
static int s_aws_input_checksum_stream_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
(void)stream;
(void)offset;
(void)basis;
AWS_LOGF_ERROR(
AWS_LS_S3_CLIENT,
"Cannot seek on checksum stream, as it will cause the checksum output to mismatch the checksum of the stream "
"contents");
AWS_ASSERT(false);
return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
}
static int s_aws_input_checksum_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_checksum_stream *impl = AWS_CONTAINER_OF(stream, struct aws_checksum_stream, base);
size_t original_len = dest->len;
if (aws_input_stream_read(impl->old_stream, dest)) {
return AWS_OP_ERR;
}
struct aws_byte_cursor to_sum = aws_byte_cursor_from_buf(dest);
/* Move the cursor to the part to calculate the checksum */
aws_byte_cursor_advance(&to_sum, original_len);
/* If read failed, `aws_input_stream_read` will handle the error to restore the dest. No need to handle error here
*/
return aws_checksum_update(impl->checksum, &to_sum);
}
static int s_aws_input_checksum_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_checksum_stream *impl = AWS_CONTAINER_OF(stream, struct aws_checksum_stream, base);
return aws_input_stream_get_status(impl->old_stream, status);
}
static int s_aws_input_checksum_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
struct aws_checksum_stream *impl = AWS_CONTAINER_OF(stream, struct aws_checksum_stream, base);
return aws_input_stream_get_length(impl->old_stream, out_length);
}
/* We take ownership of the old input stream, and destroy it with this input stream. This is because we want to be able
* to substitute in the chunk_stream for the cursor stream currently used in s_s3_meta_request_default_prepare_request
* which returns the new stream. So in order to prevent the need of keeping track of two input streams we instead
* consume the cursor stream and destroy it with this one */
static void s_aws_input_checksum_stream_destroy(struct aws_checksum_stream *impl) {
if (!impl) {
return;
}
int result = aws_checksum_finalize(impl->checksum, &impl->checksum_result, 0);
if (result != AWS_OP_SUCCESS) {
aws_byte_buf_reset(&impl->checksum_result, true);
}
AWS_ASSERT(result == AWS_OP_SUCCESS);
struct aws_byte_cursor checksum_result_cursor = aws_byte_cursor_from_buf(&impl->checksum_result);
AWS_FATAL_ASSERT(aws_base64_encode(&checksum_result_cursor, impl->encoded_checksum_output) == AWS_OP_SUCCESS);
aws_checksum_destroy(impl->checksum);
aws_input_stream_release(impl->old_stream);
aws_byte_buf_clean_up(&impl->checksum_result);
aws_mem_release(impl->allocator, impl);
}
static struct aws_input_stream_vtable s_aws_input_checksum_stream_vtable = {
.seek = s_aws_input_checksum_stream_seek,
.read = s_aws_input_checksum_stream_read,
.get_status = s_aws_input_checksum_stream_get_status,
.get_length = s_aws_input_checksum_stream_get_length,
};
struct aws_input_stream *aws_checksum_stream_new(
struct aws_allocator *allocator,
struct aws_input_stream *existing_stream,
enum aws_s3_checksum_algorithm algorithm,
struct aws_byte_buf *checksum_output) {
AWS_PRECONDITION(existing_stream);
struct aws_checksum_stream *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_checksum_stream));
impl->allocator = allocator;
impl->base.vtable = &s_aws_input_checksum_stream_vtable;
impl->checksum = aws_checksum_new(allocator, algorithm);
if (impl->checksum == NULL) {
goto on_error;
}
aws_byte_buf_init(&impl->checksum_result, allocator, impl->checksum->digest_size);
impl->old_stream = aws_input_stream_acquire(existing_stream);
impl->encoded_checksum_output = checksum_output;
aws_ref_count_init(
&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_checksum_stream_destroy);
return &impl->base;
on_error:
aws_mem_release(impl->allocator, impl);
return NULL;
}
|