aboutsummaryrefslogtreecommitdiffstats
path: root/util/system/sem.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/system/sem.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/system/sem.cpp')
-rw-r--r--util/system/sem.cpp278
1 files changed, 278 insertions, 0 deletions
diff --git a/util/system/sem.cpp b/util/system/sem.cpp
new file mode 100644
index 0000000000..4a93b903b5
--- /dev/null
+++ b/util/system/sem.cpp
@@ -0,0 +1,278 @@
+#include "sem.h"
+
+#ifdef _win_
+ #include <malloc.h>
+#elif defined(_sun)
+ #include <alloca.h>
+#endif
+
+#include <cerrno>
+#include <cstring>
+
+#ifdef _win_
+ #include "winint.h"
+#else
+ #include <signal.h>
+ #include <unistd.h>
+ #include <semaphore.h>
+
+ #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
+ #include <fcntl.h>
+ #else
+ #define USE_SYSV_SEMAPHORES //unixoids declared the standard but not implemented it...
+ #endif
+#endif
+
+#ifdef USE_SYSV_SEMAPHORES
+ #include <errno.h>
+ #include <sys/types.h>
+ #include <sys/ipc.h>
+ #include <sys/sem.h>
+
+ #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
+union semun {
+ int val;
+ struct semid_ds* buf;
+ unsigned short* array;
+} arg;
+ #else
+union semun arg;
+ #endif
+#endif
+
+#include <util/digest/city.h>
+#include <util/string/cast.h>
+#include <util/random/random.h>
+#include <util/random/fast.h>
+
+namespace {
+ class TSemaphoreImpl {
+ private:
+#ifdef _win_
+ using SEMHANDLE = HANDLE;
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ using SEMHANDLE = int;
+ #else
+ using SEMHANDLE = sem_t*;
+ #endif
+#endif
+
+ SEMHANDLE Handle;
+
+ public:
+ inline TSemaphoreImpl(const char* name, ui32 max_free_count)
+ : Handle(0)
+ {
+#ifdef _win_
+ char* key = (char*)name;
+ if (name) {
+ size_t len = strlen(name);
+ key = (char*)alloca(len + 1);
+ strcpy(key, name);
+ if (len > MAX_PATH)
+ *(key + MAX_PATH) = 0;
+ char* p = key;
+ while (*p) {
+ if (*p == '\\')
+ *p = '/';
+ ++p;
+ }
+ }
+ // non-blocking on init
+ Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); //32 bit hash
+ Handle = semget(key, 0, 0); // try to open exist semaphore
+ if (Handle == -1) { // create new semaphore
+ Handle = semget(key, 1, 0666 | IPC_CREAT);
+ if (Handle != -1) {
+ union semun arg;
+ arg.val = max_free_count;
+ semctl(Handle, 0, SETVAL, arg);
+ } else {
+ ythrow TSystemError() << "can not init sempahore";
+ }
+ }
+ #else
+ Handle = sem_open(name, O_CREAT, 0666, max_free_count);
+ if (Handle == SEM_FAILED) {
+ ythrow TSystemError() << "can not init sempahore";
+ }
+ #endif
+#endif
+ }
+
+ inline ~TSemaphoreImpl() {
+#ifdef _win_
+ ::CloseHandle(Handle);
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
+ //struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
+ //if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
+ // semctl(Handle, 0, IPC_RMID);
+ #else
+ sem_close(Handle); // we DO NOT want sem_unlink(...)
+ #endif
+#endif
+ }
+
+ inline void Release() noexcept {
+#ifdef _win_
+ ::ReleaseSemaphore(Handle, 1, 0);
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, 1, SEM_UNDO}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_post(Handle);
+ #endif
+ Y_VERIFY(ret == 0, "can not release semaphore");
+#endif
+ }
+
+ //The UNIX semaphore object does not support a timed "wait", and
+ //hence to maintain consistancy, for win32 case we use INFINITE or 0 timeout.
+ inline void Acquire() noexcept {
+#ifdef _win_
+ Y_VERIFY(::WaitForSingleObject(Handle, INFINITE) == WAIT_OBJECT_0, "can not acquire semaphore");
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, -1, SEM_UNDO}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_wait(Handle);
+ #endif
+ Y_VERIFY(ret == 0, "can not acquire semaphore");
+#endif
+ }
+
+ inline bool TryAcquire() noexcept {
+#ifdef _win_
+ // zero-second time-out interval
+ // WAIT_OBJECT_0: current free count > 0
+ // WAIT_TIMEOUT: current free count == 0
+ return ::WaitForSingleObject(Handle, 0) == WAIT_OBJECT_0;
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_trywait(Handle);
+ #endif
+ return ret == 0;
+#endif
+ }
+ };
+
+#if defined(_unix_)
+ /*
+ Disable errors/warnings about deprecated sem_* in Darwin
+*/
+ #ifdef _darwin_
+ Y_PRAGMA_DIAGNOSTIC_PUSH
+ Y_PRAGMA_NO_DEPRECATED
+ #endif
+ struct TPosixSemaphore {
+ inline TPosixSemaphore(ui32 maxFreeCount) {
+ if (sem_init(&S_, 0, maxFreeCount)) {
+ ythrow TSystemError() << "can not init semaphore";
+ }
+ }
+
+ inline ~TPosixSemaphore() {
+ Y_VERIFY(sem_destroy(&S_) == 0, "semaphore destroy failed");
+ }
+
+ inline void Acquire() noexcept {
+ Y_VERIFY(sem_wait(&S_) == 0, "semaphore acquire failed");
+ }
+
+ inline void Release() noexcept {
+ Y_VERIFY(sem_post(&S_) == 0, "semaphore release failed");
+ }
+
+ inline bool TryAcquire() noexcept {
+ if (sem_trywait(&S_)) {
+ Y_VERIFY(errno == EAGAIN, "semaphore try wait failed");
+
+ return false;
+ }
+
+ return true;
+ }
+
+ sem_t S_;
+ };
+ #ifdef _darwin_
+ Y_PRAGMA_DIAGNOSTIC_POP
+ #endif
+#endif
+}
+
+class TSemaphore::TImpl: public TSemaphoreImpl {
+public:
+ inline TImpl(const char* name, ui32 maxFreeCount)
+ : TSemaphoreImpl(name, maxFreeCount)
+ {
+ }
+};
+
+TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
+ : Impl_(new TImpl(name, maxFreeCount))
+{
+}
+
+TSemaphore::~TSemaphore() = default;
+
+void TSemaphore::Release() noexcept {
+ Impl_->Release();
+}
+
+void TSemaphore::Acquire() noexcept {
+ Impl_->Acquire();
+}
+
+bool TSemaphore::TryAcquire() noexcept {
+ return Impl_->TryAcquire();
+}
+
+#if defined(_unix_) && !defined(_darwin_)
+class TFastSemaphore::TImpl: public TPosixSemaphore {
+public:
+ inline TImpl(ui32 n)
+ : TPosixSemaphore(n)
+ {
+ }
+};
+#else
+class TFastSemaphore::TImpl: public TString, public TSemaphoreImpl {
+public:
+ inline TImpl(ui32 n)
+ : TString(ToString(RandomNumber<ui64>()))
+ , TSemaphoreImpl(c_str(), n)
+ {
+ }
+};
+#endif
+
+TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
+ : Impl_(new TImpl(maxFreeCount))
+{
+}
+
+TFastSemaphore::~TFastSemaphore() = default;
+
+void TFastSemaphore::Release() noexcept {
+ Impl_->Release();
+}
+
+void TFastSemaphore::Acquire() noexcept {
+ Impl_->Acquire();
+}
+
+bool TFastSemaphore::TryAcquire() noexcept {
+ return Impl_->TryAcquire();
+}