diff options
author | yuryalekseev <yuryalekseev@yandex-team.com> | 2022-09-29 17:21:40 +0300 |
---|---|---|
committer | yuryalekseev <yuryalekseev@yandex-team.com> | 2022-09-29 17:21:40 +0300 |
commit | 52ce13bc3056d016aa198253b691ed50bb8ab80a (patch) | |
tree | a993b97de3daf1b7c5a37fb2c8298e6465e1a1c5 | |
parent | b9c33d2970daa903dfdd7cfd222c293593e83536 (diff) | |
download | ydb-52ce13bc3056d016aa198253b691ed50bb8ab80a.tar.gz |
PR from branch users/yuryalekseev//Add_io_uring_to_pdisk_io
Add TAsyncIoContextLiburing.
-rw-r--r-- | CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/pdisk_io/CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/library/pdisk_io/aio_linux.cpp | 377 |
3 files changed, 362 insertions, 18 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 494ea1c13b0..3ecc4238c41 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ conan_cmake_configure( bison/3.5.3 c-ares/1.18.1 libiconv/1.15 + liburing/2.1 openssl/1.1.1l ragel/6.10 yasm/1.3.0 diff --git a/ydb/library/pdisk_io/CMakeLists.linux.txt b/ydb/library/pdisk_io/CMakeLists.linux.txt index 3e47c0b5b63..5680d4da426 100644 --- a/ydb/library/pdisk_io/CMakeLists.linux.txt +++ b/ydb/library/pdisk_io/CMakeLists.linux.txt @@ -7,6 +7,7 @@ find_package(AIO REQUIRED) +find_package(liburing REQUIRED) add_subdirectory(protos) add_library(ydb-library-pdisk_io) @@ -15,6 +16,7 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC yutil tools-enum_parser-enum_serialization_runtime AIO::aio + liburing::liburing cpp-actors-core cpp-actors-wilson cpp-monlib-dynamic_counters diff --git a/ydb/library/pdisk_io/aio_linux.cpp b/ydb/library/pdisk_io/aio_linux.cpp index 418321a7c8d..cc0f1397be0 100644 --- a/ydb/library/pdisk_io/aio_linux.cpp +++ b/ydb/library/pdisk_io/aio_linux.cpp @@ -10,8 +10,9 @@ #include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/system/file.h> #include <util/stream/format.h> -#include <libaio.h> +#include <contrib/libs/liburing/src/include/liburing.h> +#include <libaio.h> #include <linux/fs.h> #include <sys/ioctl.h> @@ -20,6 +21,28 @@ namespace NPDisk { class TCallbackContext; +// +// TBufferPoolHugePages +// +TBufferPoolHugePages::TBufferPoolHugePages(ui32 bufferSize, ui32 bufferCount, TBufferPool::TPDiskParams params) + : TBufferPool(bufferSize, bufferCount, params) +{ + TBufferPool::UseHugePages = true; + constexpr ui32 alignment = 512; + auto spdkState = Singleton<TSpdkStateOSS>(); + AlignedBuffer = spdkState->Malloc(AlignUp(ui32(bufferSize), ui32(alignment)) * bufferCount, alignment); + Y_VERIFY((ui64)AlignedBuffer % alignment == 0); + MarkUpPool(AlignedBuffer); +} + +TBufferPoolHugePages::~TBufferPoolHugePages() { + auto spdkState = Singleton<TSpdkStateOSS>(); + spdkState->Free(AlignedBuffer); +} + +/* + TAsyncIoOperation +*/ struct TAsyncIoOperation : iocb, IAsyncIoOperation { void* Cookie; ICallback *Callback; @@ -82,6 +105,9 @@ struct TAsyncIoOperation : iocb, IAsyncIoOperation { } }; +/* + TAsyncIoContextLibaio +*/ class TAsyncIoContextLibaio : public IAsyncIoContext { io_context_t IoContext; TActorSystem *ActorSystem; @@ -291,6 +317,7 @@ public: return EIoResult::TryAgain; } } + int ret = io_setup(maxEvents, &IoContext); if (ret < 0) { LastErrno = -ret; @@ -347,27 +374,341 @@ public: } }; +/* + TAsyncIoOperationLiburing +*/ +struct TAsyncIoOperationLiburing : IAsyncIoOperation { + void* Cookie = nullptr; + ICallback *Callback = nullptr; + TReqId ReqId; + NWilson::TTraceId TraceId; + bool IsTrim = false; // Trim is special case of IO_CMD_PWRITE operation -// -// TBufferPoolHugePages -// -TBufferPoolHugePages::TBufferPoolHugePages(ui32 bufferSize, ui32 bufferCount, TBufferPool::TPDiskParams params) - : TBufferPool(bufferSize, bufferCount, params) -{ - TBufferPool::UseHugePages = true; - constexpr ui32 alignment = 512; - auto spdkState = Singleton<TSpdkStateOSS>(); - AlignedBuffer = spdkState->Malloc(AlignUp(ui32(bufferSize), ui32(alignment)) * bufferCount, alignment); - Y_VERIFY((ui64)AlignedBuffer % alignment == 0); - MarkUpPool(AlignedBuffer); -} + void* DataPtr = nullptr; + size_t DataSize = 0; + long long DataOffset = 0; + bool IsReadOp = false; -TBufferPoolHugePages::~TBufferPoolHugePages() { - auto spdkState = Singleton<TSpdkStateOSS>(); - spdkState->Free(AlignedBuffer); -} + TAsyncIoOperationLiburing() = default; + + TAsyncIoOperationLiburing(void *cookie, TReqId reqId, NWilson::TTraceId *traceId) + : Cookie(cookie) + , ReqId(reqId) + , TraceId(traceId ? std::move(*traceId) : NWilson::TTraceId()) + {} + + ~TAsyncIoOperationLiburing() override { + } + + void* GetCookie() override { + return Cookie; + } + + NWilson::TTraceId *GetTraceIdPtr() override { + return &TraceId; + } + + void* GetData() override { + return DataPtr; + } + + ui64 GetOffset() override { + return DataOffset; + }; + + ui64 GetSize() override { + return DataSize; + }; + + TReqId GetReqId() override { + return ReqId; + } + + EType GetType() override { + if (IsReadOp) { + return EType::PRead; + } + + if (IsTrim) { + return EType::PTrim; + } + + return EType::PWrite; + }; + + void SetCallback(ICallback *callback) override { + Callback = callback; + } + + void ExecCallback(TAsyncIoOperationResult *result) override { + Callback->Exec(result); + } +}; + +/* + TAsyncIoContextLiburing +*/ +class TAsyncIoContextLiburing : public IAsyncIoContext { + TActorSystem *ActorSystem = nullptr; + TPool<TAsyncIoOperationLiburing, 1024> Pool; + THolder<TFileHandle> File; + int LastErrno = 0; + + TPDiskDebugInfo PDiskInfo; + struct io_uring Ring; + +public: + TAsyncIoContextLiburing(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags) + : ActorSystem(nullptr) + , PDiskInfo(path, pDiskId, "liburing") + { + Y_UNUSED(flags); + } + + ~TAsyncIoContextLiburing() { + } + + void InitializeMonitoring(TPDiskMon &) override { + } + + IAsyncIoOperation* CreateAsyncIoOperation(void* cookie, TReqId reqId, NWilson::TTraceId *traceId) override { + void *p = Pool.Pop(); + return new (p) TAsyncIoOperationLiburing(cookie, reqId, traceId); + } + + void DestroyAsyncIoOperation(IAsyncIoOperation* op) override { + Pool.Push(static_cast<TAsyncIoOperationLiburing*>(op)); + } + + EIoResult Destroy() override { + io_uring_queue_exit(&Ring); + if (File) { + auto ret = File->Flock(LOCK_UN); + Y_VERIFY_S(ret == 0, "Error in Flock(LOCK_UN), errno# " << errno << " strerror# " << strerror(errno)); + bool isOk = File->Close(); + Y_VERIFY_S(isOk, PDiskInfo << " error on file close, errno# " << errno << " strerror# " << strerror(errno)); + } + return EIoResult::Ok; + } + + i64 GetEvents(ui64 numEvents, ui64, TAsyncIoOperationResult *events, TDuration timeout) override { + struct io_uring_cqe *cqes = nullptr; + struct __kernel_timespec ts = { (time_t)timeout.Seconds(), timeout.NanoSecondsOfSecond() }; + + int ret = io_uring_wait_cqes(&Ring, &cqes, numEvents, &ts, nullptr); + if (ret < 0) { + if (-ret == ETIME) { + return 0; + } + return -static_cast<i64>(RetErrnoToContextError(ret, "io_uring_wait_cqes")); + } + + for (auto i = 0u; i < numEvents; ++i) { + auto *op = reinterpret_cast<TAsyncIoOperationLiburing*>(io_uring_cqe_get_data(&cqes[i])); + events[i].Operation = op; + events[i].Result = RetErrnoToContextError(cqes[i].res, "cqes[]->res"); + events[i].Operation->ExecCallback(&events[i]); + } + + io_uring_cq_advance(&Ring, numEvents); + return numEvents; + } + + EIoResult RetErrnoToContextError(i64 ret, const char *info) { + if (ret < 0) { + switch(-ret) { + case EAGAIN: return EIoResult::TryAgain; + case EBADF: return EIoResult::BadFileNumber; + case EFAULT: return EIoResult::BadAddress; + case EINTR: return EIoResult::InterruptedSystemCall; + case EINVAL: return EIoResult::InvalidArgument; + case EIO: return EIoResult::IOError; + case ENOMEM: return EIoResult::OutOfMemory; + case ENOSYS: return EIoResult::FunctionNotImplemented; + case EILSEQ: return EIoResult::InvalidSequence; + case ENODATA: return EIoResult::NoData; + default: Y_FAIL_S(PDiskInfo << " unexpected error in " << info << ", error# " << -ret + << " strerror# " << strerror(-ret)); + } + } else { + return EIoResult::Ok; + } + } + + void PreparePRead(IAsyncIoOperation *op, void *destination, size_t size, size_t offset) override { + Y_VERIFY_DEBUG(File); + + auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op); + Y_VERIFY(tOp != nullptr); + + tOp->IsReadOp = true; + tOp->DataPtr = destination; + tOp->DataSize = size; + tOp->DataOffset = offset; + } + + void PreparePWrite(IAsyncIoOperation *op, const void *source, size_t size, size_t offset) override { + Y_VERIFY_DEBUG(File); + + auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op); + Y_VERIFY(tOp != nullptr); + + tOp->IsReadOp = false; + tOp->DataPtr = const_cast<void*>(source); + tOp->DataSize = size; + tOp->DataOffset = offset; + } + + void PreparePTrim(IAsyncIoOperation *op, size_t size, size_t offset) override { + PreparePWrite(op, nullptr, size, offset); + static_cast<TAsyncIoOperationLiburing*>(op)->IsTrim = true; + } + + bool DoTrim(IAsyncIoOperation *op) override { + auto trim = dynamic_cast<TAsyncIoOperationLiburing*>(op); + Y_VERIFY(trim != nullptr); + Y_VERIFY(trim->IsTrim); + + ui64 range[2] = {trim->GetOffset(), trim->GetSize()}; + bool tryAgain = true; + TStringStream str; + str << "BLKDISCARD " << PDiskInfo; + errno = 0; + if (ioctl((FHANDLE)*File.Get(), BLKDISCARD, &range) == -1) { + int errorId = errno; + if (errorId == EOPNOTSUPP) { + str << " failed, operation not supported, trimming will be disabled for the device"; + tryAgain = false; + } else if (errorId == ENOTTY) { + str << " failed, device is not a typewriter! Trimming will be disabled for the device"; + tryAgain = false; + } else { + str << " failed, errno# " << errorId << " strerror# " << strerror(errorId); + tryAgain = true; + } + + if (ActorSystem) { + //LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_DEVICE, str.Str()); + } else { + Cerr << str.Str() << Endl; + } + } else { + if (ActorSystem) { + //LOG_DEBUG_S(*ActorSystem, NKikimrServices::BS_DEVICE, str.Str() << " trimmed# " << range[1] + // << " size# " << trim->GetSize() << " from# " << range[0] << " offset# " << trim->GetOffset()); + } + tryAgain = true; + } + return tryAgain; + } + + int LockFile() { + int ret = -1; + errno = EWOULDBLOCK; + int retry = 2; + while (ret == -1 && errno == EWOULDBLOCK && retry > 0) { + errno = 0; + ret = File->Flock(LOCK_EX | LOCK_NB); + if (ret == 0) { + break; + } else { + LastErrno = errno; + if (ActorSystem){ + //LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_DEVICE, PDiskInfo + // << " error on file locking, strerror# " << strerror(errno)); + } + if (retry > 1) { + Sleep(TDuration::Seconds(1)); + } + } + --retry; + } + return ret; + } + + EIoResult Setup(ui64, bool doLock) override { + File = MakeHolder<TFileHandle>(PDiskInfo.Path.c_str(), + OpenExisting | RdWr | DirectAligned | Sync); + bool isFileOpened = File->IsOpen(); + if (isFileOpened) { + if (doLock) { + int ret = LockFile(); + if (ret == -1) { + return EIoResult::FileLockError; + } + } + } else { + int fd = open(PDiskInfo.Path.c_str(), O_RDWR); + if (fd < 0) { + LastErrno = errno; + return EIoResult::FileOpenError; + } else { + close(fd); + return EIoResult::TryAgain; + } + } + + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + params.flags |= IORING_SETUP_SQPOLL; + params.sq_thread_idle = 100; + int ret = io_uring_queue_init_params(512, &Ring, ¶ms); + if (ret < 0) { + LastErrno = -ret; + } + return RetErrnoToContextError(ret, "io_uring_queue_init_params"); + } + + EIoResult Submit(IAsyncIoOperation *op, ICallback *callback) override { + op->SetCallback(callback); + + auto sqe = io_uring_get_sqe(&Ring); + if (sqe == nullptr) { + return EIoResult::TryAgain; + } + + auto tOp = dynamic_cast<TAsyncIoOperationLiburing*>(op); + Y_VERIFY(tOp != nullptr); + + if (tOp->IsReadOp) { + io_uring_prep_read(sqe, static_cast<FHANDLE>(*File), tOp->DataPtr, tOp->DataSize, tOp->DataOffset); + } else { + io_uring_prep_write(sqe, static_cast<FHANDLE>(*File), tOp->DataPtr, tOp->DataSize, tOp->DataOffset); + } + + io_uring_sqe_set_data(sqe, op); + int ret = io_uring_submit(&Ring); + if (ret < 0) { + LastErrno = -ret; + } + return RetErrnoToContextError(ret, "io_uring_submit"); + } + + void SetActorSystem(TActorSystem *actorSystem) override { + ActorSystem = actorSystem; + } + + TString GetPDiskInfo() override { + return PDiskInfo.Str(); + } + + int GetLastErrno() override { + return LastErrno; + } + + TFileHandle *GetFileHandle() override { + return File.Get(); + } + + void OnAsyncIoOperationCompletion(IAsyncIoOperation *) override { + } +}; +/* + CreateAsyncIoContextReal +*/ std::unique_ptr<IAsyncIoContext> CreateAsyncIoContextReal(const TString &path, ui32 pDiskId, TDeviceMode::TFlags flags) { + // TODO: choose TAsyncIoContextLibaio or TAsyncIoContextLiburing here return std::make_unique<TAsyncIoContextLibaio>(path, pDiskId, flags); } |