aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-io/source/async_stream.c
blob: 6422bb8470568f13815e258d0de6ae619733fea7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

#include <aws/io/async_stream.h>

#include <aws/common/byte_buf.h>
#include <aws/io/future.h>
#include <aws/io/stream.h>

void aws_async_input_stream_init_base(
    struct aws_async_input_stream *stream,
    struct aws_allocator *alloc,
    const struct aws_async_input_stream_vtable *vtable,
    void *impl) {

    AWS_PRECONDITION(stream);
    AWS_PRECONDITION(alloc);
    AWS_PRECONDITION(vtable);
    AWS_PRECONDITION(vtable->read);
    AWS_PRECONDITION(vtable->destroy);

    AWS_ZERO_STRUCT(*stream);
    stream->alloc = alloc;
    stream->vtable = vtable;
    stream->impl = impl;
    aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
}

struct aws_async_input_stream *aws_async_input_stream_acquire(struct aws_async_input_stream *stream) {
    if (stream != NULL) {
        aws_ref_count_acquire(&stream->ref_count);
    }
    return stream;
}

struct aws_async_input_stream *aws_async_input_stream_release(struct aws_async_input_stream *stream) {
    if (stream) {
        aws_ref_count_release(&stream->ref_count);
    }
    return NULL;
}

struct aws_future_bool *aws_async_input_stream_read(struct aws_async_input_stream *stream, struct aws_byte_buf *dest) {
    AWS_PRECONDITION(stream);
    AWS_PRECONDITION(dest);

    /* Ensure the buffer has space available */
    if (dest->len == dest->capacity) {
        struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
        aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
        return future;
    }

    struct aws_future_bool *future = stream->vtable->read(stream, dest);
    AWS_POSTCONDITION(future != NULL);
    return future;
}

/* Data to perform the aws_async_input_stream_read_to_fill() job */
struct aws_async_input_stream_fill_job {
    struct aws_allocator *alloc;
    struct aws_async_input_stream *stream;
    struct aws_byte_buf *dest;
    /* Future for each read() step */
    struct aws_future_bool *read_step_future;
    /* Future to set when this fill job completes */
    struct aws_future_bool *on_complete_future;
};

static void s_async_stream_fill_job_complete(
    struct aws_async_input_stream_fill_job *fill_job,
    bool eof,
    int error_code) {

    if (error_code) {
        aws_future_bool_set_error(fill_job->on_complete_future, error_code);
    } else {
        aws_future_bool_set_result(fill_job->on_complete_future, eof);
    }
    aws_future_bool_release(fill_job->on_complete_future);
    aws_async_input_stream_release(fill_job->stream);
    aws_mem_release(fill_job->alloc, fill_job);
}

/* Call read() in a loop.
 * It would be simpler to set a completion callback for each read() call,
 * but this risks our call stack growing large if there are many small, synchronous, reads.
 * So be complicated and loop until a read() ) call is actually async,
 * and only then set the completion callback (which is this same function, where we resume looping). */
static void s_async_stream_fill_job_loop(void *user_data) {
    struct aws_async_input_stream_fill_job *fill_job = user_data;

    while (true) {
        /* Process read_step_future from previous iteration of loop.
         * It's NULL the first time the job ever enters the loop.
         * But it's set in subsequent runs of the loop,
         * and when this is a read_step_future completion callback. */
        if (fill_job->read_step_future) {
            if (aws_future_bool_register_callback_if_not_done(
                    fill_job->read_step_future, s_async_stream_fill_job_loop, fill_job)) {

                /* not done, we'll resume this loop when callback fires */
                return;
            }

            /* read_step_future is done */
            int error_code = aws_future_bool_get_error(fill_job->read_step_future);
            bool eof = error_code ? false : aws_future_bool_get_result(fill_job->read_step_future);
            bool reached_capacity = fill_job->dest->len == fill_job->dest->capacity;
            fill_job->read_step_future = aws_future_bool_release(fill_job->read_step_future); /* release and NULL */

            if (error_code || eof || reached_capacity) {
                /* job complete! */
                s_async_stream_fill_job_complete(fill_job, eof, error_code);
                return;
            }
        }

        /* Kick off a read, which may or may not complete async */
        fill_job->read_step_future = aws_async_input_stream_read(fill_job->stream, fill_job->dest);
    }
}

struct aws_future_bool *aws_async_input_stream_read_to_fill(
    struct aws_async_input_stream *stream,
    struct aws_byte_buf *dest) {

    AWS_PRECONDITION(stream);
    AWS_PRECONDITION(dest);

    struct aws_future_bool *future = aws_future_bool_new(stream->alloc);

    /* Ensure the buffer has space available */
    if (dest->len == dest->capacity) {
        aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
        return future;
    }

    /* Prepare for async job */
    struct aws_async_input_stream_fill_job *fill_job =
        aws_mem_calloc(stream->alloc, 1, sizeof(struct aws_async_input_stream_fill_job));
    fill_job->alloc = stream->alloc;
    fill_job->stream = aws_async_input_stream_acquire(stream);
    fill_job->dest = dest;
    fill_job->on_complete_future = aws_future_bool_acquire(future);

    /* Kick off work */
    s_async_stream_fill_job_loop(fill_job);

    return future;
}