aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryuryalekseev <yuryalekseev@yandex-team.com>2022-09-29 17:21:40 +0300
committeryuryalekseev <yuryalekseev@yandex-team.com>2022-09-29 17:21:40 +0300
commit52ce13bc3056d016aa198253b691ed50bb8ab80a (patch)
treea993b97de3daf1b7c5a37fb2c8298e6465e1a1c5
parentb9c33d2970daa903dfdd7cfd222c293593e83536 (diff)
downloadydb-52ce13bc3056d016aa198253b691ed50bb8ab80a.tar.gz
PR from branch users/yuryalekseev//Add_io_uring_to_pdisk_io
Add TAsyncIoContextLiburing.
-rw-r--r--CMakeLists.txt1
-rw-r--r--ydb/library/pdisk_io/CMakeLists.linux.txt2
-rw-r--r--ydb/library/pdisk_io/aio_linux.cpp377
3 files changed, 362 insertions, 18 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 494ea1c13b0..3ecc4238c41 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -36,6 +36,7 @@ conan_cmake_configure(
bison/3.5.3
c-ares/1.18.1
libiconv/1.15
+ liburing/2.1
openssl/1.1.1l
ragel/6.10
yasm/1.3.0
diff --git a/ydb/library/pdisk_io/CMakeLists.linux.txt b/ydb/library/pdisk_io/CMakeLists.linux.txt
index 3e47c0b5b63..5680d4da426 100644
--- a/ydb/library/pdisk_io/CMakeLists.linux.txt
+++ b/ydb/library/pdisk_io/CMakeLists.linux.txt
@@ -7,6 +7,7 @@
find_package(AIO REQUIRED)
+find_package(liburing REQUIRED)
add_subdirectory(protos)
add_library(ydb-library-pdisk_io)
@@ -15,6 +16,7 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
AIO::aio
+ liburing::liburing
cpp-actors-core
cpp-actors-wilson
cpp-monlib-dynamic_counters
diff --git a/ydb/library/pdisk_io/aio_linux.cpp b/ydb/library/pdisk_io/aio_linux.cpp
index 418321a7c8d..cc0f1397be0 100644
--- a/ydb/library/pdisk_io/aio_linux.cpp
+++ b/ydb/library/pdisk_io/aio_linux.cpp
@@ -10,8 +10,9 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/system/file.h>
#include <util/stream/format.h>
-#include <libaio.h>
+#include <contrib/libs/liburing/src/include/liburing.h>
+#include <libaio.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
@@ -20,6 +21,28 @@ namespace NPDisk {
class TCallbackContext;
+//
+// TBufferPoolHugePages
+//
+TBufferPoolHugePages::TBufferPoolHugePages(ui32 bufferSize, ui32 bufferCount, TBufferPool::TPDiskParams params)
+ : TBufferPool(bufferSize, bufferCount, params)
+{
+ TBufferPool::UseHugePages = true;
+ constexpr ui32 alignment = 512;
+ auto spdkState = Singleton<TSpdkStateOSS>();
+ AlignedBuffer = spdkState->Malloc(AlignUp(ui32(bufferSize), ui32(alignment)) * bufferCount, alignment);
+ Y_VERIFY((ui64)AlignedBuffer % alignment == 0);
+ MarkUpPool(AlignedBuffer);
+}
+
+TBufferPoolHugePages::~TBufferPoolHugePages() {
+ auto spdkState = Singleton<TSpdkStateOSS>();
+ spdkState->Free(AlignedBuffer);
+}
+
+/*
+ TAsyncIoOperation
+*/
struct TAsyncIoOperation : iocb, IAsyncIoOperation {
void* Cookie;
ICallback *Callback;
@@ -82,6 +105,9 @@ struct TAsyncIoOperation : iocb, IAsyncIoOperation {
}
};
+/*
+ TAsyncIoContextLibaio
+*/
class TAsyncIoContextLibaio : public IAsyncIoContext {
io_context_t IoContext;
TActorSystem *ActorSystem;
@@ -291,6 +317,7 @@ public:
return EIoResult::TryAgain;
}
}
+
int ret = io_setup(maxEvents, &IoContext);
if (ret < 0) {
LastErrno = -ret;
@@ -347,27 +374,341 @@ public:
}
};
+/*
+ TAsyncIoOperationLiburing
+*/
+struct TAsyncIoOperationLiburing : IAsyncIoOperation {
+ void* Cookie = nullptr;
+ ICallback *Callback = nullptr;
+ TReqId ReqId;
+ NWilson::TTraceId TraceId;
+ bool IsTrim = false; // Trim is special case of IO_CMD_PWRITE operation
-//
-// TBufferPoolHugePages
-//
-TBufferPoolHugePages::TBufferPoolHugePages(ui32 bufferSize, ui32 bufferCount, TBufferPool::TPDiskParams params)
- : TBufferPool(bufferSize, bufferCount, params)
-{
- TBufferPool::UseHugePages = true;
- constexpr ui32 alignment = 512;
- auto spdkState = Singleton<TSpdkStateOSS>();
- AlignedBuffer = spdkState->Malloc(AlignUp(ui32(bufferSize), ui32(alignment)) * bufferCount, alignment);
- Y_VERIFY((ui64)AlignedBuffer % alignment == 0);
- MarkUpPool(AlignedBuffer);
-}
+ void* DataPtr = nullptr;
+ size_t DataSize = 0;
+ long long DataOffset = 0;
+ bool IsReadOp = false;
-TBufferPoolHugePages::~TBufferPoolHugePages() {
- auto spdkState = Singleton<TSpdkStateOSS>();
- spdkState->Free(AlignedBuffer);
-}
+ TAsyncIoOperationLiburing() = default;
+
+ TAsyncIoOperationLiburing(void *cookie, TReqId reqId, NWilson::TTraceId *traceId)
+ : Cookie(cookie)
+ , ReqId(reqId)
+ , TraceId(traceId ? std::move(*traceId) : NWilson::TTraceId())
+ {}
+
+ ~TAsyncIoOperationLiburing() override {
+ }
+
+ void* GetCookie() override {
+ return Cookie;
+ }
+
+ NWilson::TTraceId *GetTraceIdPtr() override {
+ return &TraceId;
+ }
+
+ void* GetData() override {
+ return DataPtr;
+ }
+
+ ui64 GetOffset() override {
+ return DataOffset;
+ };
+
+ ui64 GetSize() override {
+ return DataSize;
+ };
+
+ TReqId GetReqId() override {
+ return ReqId;
+ }
+
+ EType GetType() override {
+ if (IsReadOp) {
+ return EType::PRead;
+ }
+
+ if (IsTrim) {
+ return EType::PTrim;
+ }
+
+ return EType::PWrite;
+ };
+
+ void SetCallback(ICallback *callback) override {
+ Callback = callback;
+ }
+
+ void ExecCallback(TAsyncIoOperationResult *result) override {
+ Callback->Exec(result);
+ }
+};
+
+/*
+ TAsyncIoContextLiburing
+*/
+class TAsyncIoContextLiburing : public IAsyncIoContext {
+ TActorSystem *ActorSystem = nullptr;
+ TPool<TAsyncIoOperationLiburing, 1024> Pool;
+ THolder<TFileHandle> File;
+ int LastErrno = 0;
+
+ TPDiskDebugInfo PDiskInfo;
+ struct io_uring Ring;
+
+public:
+ TAsyncIoContextLiburing(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags)
+ : ActorSystem(nullptr)
+ , PDiskInfo(path, pDiskId, "liburing")
+ {
+ Y_UNUSED(flags);
+ }
+
+ ~TAsyncIoContextLiburing() {
+ }
+
+ void InitializeMonitoring(TPDiskMon &) override {
+ }
+
+ IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override {
+ void *p = Pool.Pop();
+ return new (p) TAsyncIoOperationLiburing(cookie, reqId, traceId);
+ }
+
+ void DestroyAsyncIoOperation(IAsyncIoOperation* op) override {
+ Pool.Push(static_cast<TAsyncIoOperationLiburing*>(op));
+ }
+
+ EIoResult Destroy() override {
+ io_uring_queue_exit(&Ring);
+ if (File) {
+ auto ret = File->Flock(LOCK_UN);
+ Y_VERIFY_S(ret == 0, "Error in Flock(LOCK_UN), errno# " << errno << " strerror# " << strerror(errno));
+ bool isOk = File->Close();
+ Y_VERIFY_S(isOk, PDiskInfo << " error on file close, errno# " << errno << " strerror# " << strerror(errno));
+ }
+ return EIoResult::Ok;
+ }
+
+ i64 GetEvents(ui64 numEvents, ui64, TAsyncIoOperationResult *events, TDuration timeout) override {
+ struct io_uring_cqe *cqes = nullptr;
+ struct __kernel_timespec ts = { (time_t)timeout.Seconds(), timeout.NanoSecondsOfSecond() };
+
+ int ret = io_uring_wait_cqes(&Ring, &cqes, numEvents, &ts, nullptr);
+ if (ret < 0) {
+ if (-ret == ETIME) {
+ return 0;
+ }
+ return -static_cast<i64>(RetErrnoToContextError(ret, "io_uring_wait_cqes"));
+ }
+
+ for (auto i = 0u; i < numEvents; ++i) {
+ auto *op = reinterpret_cast<TAsyncIoOperationLiburing*>(io_uring_cqe_get_data(&cqes[i]));
+ events[i].Operation = op;
+ events[i].Result = RetErrnoToContextError(cqes[i].res, "cqes[]->res");
+ events[i].Operation->ExecCallback(&events[i]);
+ }
+
+ io_uring_cq_advance(&Ring, numEvents);
+ return numEvents;
+ }
+
+ EIoResult RetErrnoToContextError(i64 ret, const char *info) {
+ if (ret < 0) {
+ switch(-ret) {
+ case EAGAIN: return EIoResult::TryAgain;
+ case EBADF: return EIoResult::BadFileNumber;
+ case EFAULT: return EIoResult::BadAddress;
+ case EINTR: return EIoResult::InterruptedSystemCall;
+ case EINVAL: return EIoResult::InvalidArgument;
+ case EIO: return EIoResult::IOError;
+ case ENOMEM: return EIoResult::OutOfMemory;
+ case ENOSYS: return EIoResult::FunctionNotImplemented;
+ case EILSEQ: return EIoResult::InvalidSequence;
+ case ENODATA: return EIoResult::NoData;
+ default: Y_FAIL_S(PDiskInfo << " unexpected error in " << info << ", error# " << -ret
+ << " strerror# " << strerror(-ret));
+ }
+ } else {
+ return EIoResult::Ok;
+ }
+ }
+
+ void PreparePRead(IAsyncIoOperation *op, void *destination, size_t size, size_t offset) override {
+ Y_VERIFY_DEBUG(File);
+
+ auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op);
+ Y_VERIFY(tOp != nullptr);
+
+ tOp->IsReadOp = true;
+ tOp->DataPtr = destination;
+ tOp->DataSize = size;
+ tOp->DataOffset = offset;
+ }
+
+ void PreparePWrite(IAsyncIoOperation *op, const void *source, size_t size, size_t offset) override {
+ Y_VERIFY_DEBUG(File);
+
+ auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op);
+ Y_VERIFY(tOp != nullptr);
+
+ tOp->IsReadOp = false;
+ tOp->DataPtr = const_cast<void*>(source);
+ tOp->DataSize = size;
+ tOp->DataOffset = offset;
+ }
+
+ void PreparePTrim(IAsyncIoOperation *op, size_t size, size_t offset) override {
+ PreparePWrite(op, nullptr, size, offset);
+ static_cast<TAsyncIoOperationLiburing*>(op)->IsTrim = true;
+ }
+
+ bool DoTrim(IAsyncIoOperation *op) override {
+ auto trim = dynamic_cast<TAsyncIoOperationLiburing*>(op);
+ Y_VERIFY(trim != nullptr);
+ Y_VERIFY(trim->IsTrim);
+
+ ui64 range[2] = {trim->GetOffset(), trim->GetSize()};
+ bool tryAgain = true;
+ TStringStream str;
+ str << "BLKDISCARD " << PDiskInfo;
+ errno = 0;
+ if (ioctl((FHANDLE)*File.Get(), BLKDISCARD, &range) == -1) {
+ int errorId = errno;
+ if (errorId == EOPNOTSUPP) {
+ str << " failed, operation not supported, trimming will be disabled for the device";
+ tryAgain = false;
+ } else if (errorId == ENOTTY) {
+ str << " failed, device is not a typewriter! Trimming will be disabled for the device";
+ tryAgain = false;
+ } else {
+ str << " failed, errno# " << errorId << " strerror# " << strerror(errorId);
+ tryAgain = true;
+ }
+
+ if (ActorSystem) {
+ //LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_DEVICE, str.Str());
+ } else {
+ Cerr << str.Str() << Endl;
+ }
+ } else {
+ if (ActorSystem) {
+ //LOG_DEBUG_S(*ActorSystem, NKikimrServices::BS_DEVICE, str.Str() << " trimmed# " << range[1]
+ // << " size# " << trim->GetSize() << " from# " << range[0] << " offset# " << trim->GetOffset());
+ }
+ tryAgain = true;
+ }
+ return tryAgain;
+ }
+
+ int LockFile() {
+ int ret = -1;
+ errno = EWOULDBLOCK;
+ int retry = 2;
+ while (ret == -1 && errno == EWOULDBLOCK && retry > 0) {
+ errno = 0;
+ ret = File->Flock(LOCK_EX | LOCK_NB);
+ if (ret == 0) {
+ break;
+ } else {
+ LastErrno = errno;
+ if (ActorSystem){
+ //LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_DEVICE, PDiskInfo
+ // << " error on file locking, strerror# " << strerror(errno));
+ }
+ if (retry > 1) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ --retry;
+ }
+ return ret;
+ }
+
+ EIoResult Setup(ui64, bool doLock) override {
+ File = MakeHolder<TFileHandle>(PDiskInfo.Path.c_str(),
+ OpenExisting | RdWr | DirectAligned | Sync);
+ bool isFileOpened = File->IsOpen();
+ if (isFileOpened) {
+ if (doLock) {
+ int ret = LockFile();
+ if (ret == -1) {
+ return EIoResult::FileLockError;
+ }
+ }
+ } else {
+ int fd = open(PDiskInfo.Path.c_str(), O_RDWR);
+ if (fd < 0) {
+ LastErrno = errno;
+ return EIoResult::FileOpenError;
+ } else {
+ close(fd);
+ return EIoResult::TryAgain;
+ }
+ }
+
+ struct io_uring_params params;
+ memset(&params, 0, sizeof(params));
+ params.flags |= IORING_SETUP_SQPOLL;
+ params.sq_thread_idle = 100;
+ int ret = io_uring_queue_init_params(512, &Ring, &params);
+ if (ret < 0) {
+ LastErrno = -ret;
+ }
+ return RetErrnoToContextError(ret, "io_uring_queue_init_params");
+ }
+
+ EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override {
+ op->SetCallback(callback);
+
+ auto sqe = io_uring_get_sqe(&Ring);
+ if (sqe == nullptr) {
+ return EIoResult::TryAgain;
+ }
+
+ auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op);
+ Y_VERIFY(tOp != nullptr);
+
+ if (tOp->IsReadOp) {
+ io_uring_prep_read(sqe, static_cast<FHANDLE>(*File), tOp->DataPtr, tOp->DataSize, tOp->DataOffset);
+ } else {
+ io_uring_prep_write(sqe, static_cast<FHANDLE>(*File), tOp->DataPtr, tOp->DataSize, tOp->DataOffset);
+ }
+
+ io_uring_sqe_set_data(sqe, op);
+ int ret = io_uring_submit(&Ring);
+ if (ret < 0) {
+ LastErrno = -ret;
+ }
+ return RetErrnoToContextError(ret, "io_uring_submit");
+ }
+
+ void SetActorSystem(TActorSystem *actorSystem) override {
+ ActorSystem = actorSystem;
+ }
+
+ TString GetPDiskInfo() override {
+ return PDiskInfo.Str();
+ }
+
+ int GetLastErrno() override {
+ return LastErrno;
+ }
+
+ TFileHandle *GetFileHandle() override {
+ return File.Get();
+ }
+
+ void OnAsyncIoOperationCompletion(IAsyncIoOperation *) override {
+ }
+};
+/*
+ CreateAsyncIoContextReal
+*/
std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextReal(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags) {
+ // TODO: choose TAsyncIoContextLibaio or TAsyncIoContextLiburing here
return std::make_unique<TAsyncIoContextLibaio>(path, pDiskId, flags);
}