1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
/*
* FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
* Current implementation relies on having one thread that reads and one that
* writes.
* Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
* are performed serially by the appropriate worker thread.
* Most systems exposes better primitives to perform asynchronous IO, such as
* io_uring on newer linux systems. The API is built in such a way that in the
* future we could replace the threads with better solutions when available.
*/
#ifndef ZSTD_FILEIO_ASYNCIO_H
#define ZSTD_FILEIO_ASYNCIO_H
#if defined (__cplusplus)
extern "C" {
#endif
#include "../lib/common/mem.h" /* U32, U64 */
#include "fileio_types.h"
#include "platform.h"
#include "util.h"
#include "../lib/common/pool.h"
#include "../lib/common/threading.h"
#define MAX_IO_JOBS (10)
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int threadPoolActive;
int totalIoJobs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;
/* Controls the file we currently write to, make changes only by using provided utility functions */
FILE* file;
/* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
* only be mutated after locking the mutex */
ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS];
int availableJobsCount;
size_t jobBufferSize;
} IOPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
/* State regarding the currently read file */
int reachedEof;
U64 nextReadOffset;
U64 waitingOnOffset;
/* We may hold an IOJob object as needed if we actively expose its buffer. */
void *currentJobHeld;
/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
* the first of them. Shouldn't be accessed from outside ot utility functions. */
U8 *coalesceBuffer;
/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
* change when consuming / refilling buffer. */
U8 *srcBuffer;
size_t srcBufferLoaded;
/* We need to know what tasks completed so we can use their buffers when their time comes.
* Should only be accessed after locking base.ioJobsMutex . */
void* completedJobs[MAX_IO_JOBS];
int completedJobsCount;
ZSTD_pthread_cond_t jobCompletedCond;
} ReadPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
unsigned storedSkips;
} WritePoolCtx_t;
typedef struct {
/* These fields are automatically set and shouldn't be changed by non WritePool code. */
void *ctx;
FILE* file;
void *buffer;
size_t bufferSize;
/* This field should be changed before a job is queued for execution and should contain the number
* of bytes to write from the buffer. */
size_t usedBufferSize;
U64 offset;
} IOJob_t;
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void);
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job);
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
/* AIO_WritePool_enqueueAndReacquireWriteJob:
* Enqueues a write job for execution and acquires a new one.
* After execution `job`'s pointed value would change to the newly acquired job.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
/* AIO_WritePool_setFile:
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);
/* AIO_WritePool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
/* AIO_ReadPool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_fillBuffer:
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
#if defined (__cplusplus)
}
#endif
#endif /* ZSTD_FILEIO_ASYNCIO_H */
|