aboutsummaryrefslogtreecommitdiffstats
path: root/util/system/sem.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/system/sem.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/system/sem.cpp')
-rw-r--r--util/system/sem.cpp424
1 files changed, 212 insertions, 212 deletions
diff --git a/util/system/sem.cpp b/util/system/sem.cpp
index 4a93b903b5..88e74fd6bc 100644
--- a/util/system/sem.cpp
+++ b/util/system/sem.cpp
@@ -1,137 +1,137 @@
-#include "sem.h"
-
+#include "sem.h"
+
#ifdef _win_
- #include <malloc.h>
+ #include <malloc.h>
#elif defined(_sun)
- #include <alloca.h>
+ #include <alloca.h>
#endif
-
+
#include <cerrno>
#include <cstring>
-#ifdef _win_
- #include "winint.h"
-#else
- #include <signal.h>
- #include <unistd.h>
- #include <semaphore.h>
-
- #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
- #include <fcntl.h>
- #else
- #define USE_SYSV_SEMAPHORES //unixoids declared the standard but not implemented it...
- #endif
-#endif
-
+#ifdef _win_
+ #include "winint.h"
+#else
+ #include <signal.h>
+ #include <unistd.h>
+ #include <semaphore.h>
+
+ #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
+ #include <fcntl.h>
+ #else
+ #define USE_SYSV_SEMAPHORES //unixoids declared the standard but not implemented it...
+ #endif
+#endif
+
#ifdef USE_SYSV_SEMAPHORES
- #include <errno.h>
- #include <sys/types.h>
- #include <sys/ipc.h>
- #include <sys/sem.h>
-
- #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
-union semun {
- int val;
- struct semid_ds* buf;
- unsigned short* array;
-} arg;
- #else
-union semun arg;
- #endif
+ #include <errno.h>
+ #include <sys/types.h>
+ #include <sys/ipc.h>
+ #include <sys/sem.h>
+
+ #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
+union semun {
+ int val;
+ struct semid_ds* buf;
+ unsigned short* array;
+} arg;
+ #else
+union semun arg;
+ #endif
#endif
-
-#include <util/digest/city.h>
-#include <util/string/cast.h>
-#include <util/random/random.h>
-#include <util/random/fast.h>
-
-namespace {
- class TSemaphoreImpl {
- private:
+
+#include <util/digest/city.h>
+#include <util/string/cast.h>
+#include <util/random/random.h>
+#include <util/random/fast.h>
+
+namespace {
+ class TSemaphoreImpl {
+ private:
#ifdef _win_
using SEMHANDLE = HANDLE;
#else
- #ifdef USE_SYSV_SEMAPHORES
+ #ifdef USE_SYSV_SEMAPHORES
using SEMHANDLE = int;
- #else
+ #else
using SEMHANDLE = sem_t*;
- #endif
-#endif
-
- SEMHANDLE Handle;
-
- public:
- inline TSemaphoreImpl(const char* name, ui32 max_free_count)
- : Handle(0)
- {
-#ifdef _win_
- char* key = (char*)name;
- if (name) {
- size_t len = strlen(name);
- key = (char*)alloca(len + 1);
- strcpy(key, name);
- if (len > MAX_PATH)
- *(key + MAX_PATH) = 0;
- char* p = key;
- while (*p) {
- if (*p == '\\')
- *p = '/';
+ #endif
+#endif
+
+ SEMHANDLE Handle;
+
+ public:
+ inline TSemaphoreImpl(const char* name, ui32 max_free_count)
+ : Handle(0)
+ {
+#ifdef _win_
+ char* key = (char*)name;
+ if (name) {
+ size_t len = strlen(name);
+ key = (char*)alloca(len + 1);
+ strcpy(key, name);
+ if (len > MAX_PATH)
+ *(key + MAX_PATH) = 0;
+ char* p = key;
+ while (*p) {
+ if (*p == '\\')
+ *p = '/';
++p;
- }
- }
- // non-blocking on init
- Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
-#else
- #ifdef USE_SYSV_SEMAPHORES
- key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); //32 bit hash
- Handle = semget(key, 0, 0); // try to open exist semaphore
- if (Handle == -1) { // create new semaphore
- Handle = semget(key, 1, 0666 | IPC_CREAT);
- if (Handle != -1) {
- union semun arg;
- arg.val = max_free_count;
- semctl(Handle, 0, SETVAL, arg);
- } else {
- ythrow TSystemError() << "can not init sempahore";
- }
- }
- #else
- Handle = sem_open(name, O_CREAT, 0666, max_free_count);
- if (Handle == SEM_FAILED) {
- ythrow TSystemError() << "can not init sempahore";
- }
- #endif
-#endif
+ }
+ }
+ // non-blocking on init
+ Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
+#else
+ #ifdef USE_SYSV_SEMAPHORES
+ key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); //32 bit hash
+ Handle = semget(key, 0, 0); // try to open exist semaphore
+ if (Handle == -1) { // create new semaphore
+ Handle = semget(key, 1, 0666 | IPC_CREAT);
+ if (Handle != -1) {
+ union semun arg;
+ arg.val = max_free_count;
+ semctl(Handle, 0, SETVAL, arg);
+ } else {
+ ythrow TSystemError() << "can not init sempahore";
+ }
+ }
+ #else
+ Handle = sem_open(name, O_CREAT, 0666, max_free_count);
+ if (Handle == SEM_FAILED) {
+ ythrow TSystemError() << "can not init sempahore";
+ }
+ #endif
+#endif
}
inline ~TSemaphoreImpl() {
#ifdef _win_
- ::CloseHandle(Handle);
+ ::CloseHandle(Handle);
#else
- #ifdef USE_SYSV_SEMAPHORES
- // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
- //struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
- //if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
- // semctl(Handle, 0, IPC_RMID);
- #else
- sem_close(Handle); // we DO NOT want sem_unlink(...)
- #endif
+ #ifdef USE_SYSV_SEMAPHORES
+ // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
+ //struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
+ //if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
+ // semctl(Handle, 0, IPC_RMID);
+ #else
+ sem_close(Handle); // we DO NOT want sem_unlink(...)
+ #endif
#endif
- }
+ }
inline void Release() noexcept {
#ifdef _win_
- ::ReleaseSemaphore(Handle, 1, 0);
+ ::ReleaseSemaphore(Handle, 1, 0);
#else
- #ifdef USE_SYSV_SEMAPHORES
- struct sembuf ops[] = {{0, 1, SEM_UNDO}};
- int ret = semop(Handle, ops, 1);
- #else
- int ret = sem_post(Handle);
- #endif
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, 1, SEM_UNDO}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_post(Handle);
+ #endif
Y_VERIFY(ret == 0, "can not release semaphore");
#endif
- }
+ }
//The UNIX semaphore object does not support a timed "wait", and
//hence to maintain consistancy, for win32 case we use INFINITE or 0 timeout.
@@ -139,140 +139,140 @@ namespace {
#ifdef _win_
Y_VERIFY(::WaitForSingleObject(Handle, INFINITE) == WAIT_OBJECT_0, "can not acquire semaphore");
#else
- #ifdef USE_SYSV_SEMAPHORES
- struct sembuf ops[] = {{0, -1, SEM_UNDO}};
- int ret = semop(Handle, ops, 1);
- #else
- int ret = sem_wait(Handle);
- #endif
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, -1, SEM_UNDO}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_wait(Handle);
+ #endif
Y_VERIFY(ret == 0, "can not acquire semaphore");
#endif
- }
+ }
inline bool TryAcquire() noexcept {
#ifdef _win_
- // zero-second time-out interval
- // WAIT_OBJECT_0: current free count > 0
- // WAIT_TIMEOUT: current free count == 0
+ // zero-second time-out interval
+ // WAIT_OBJECT_0: current free count > 0
+ // WAIT_TIMEOUT: current free count == 0
return ::WaitForSingleObject(Handle, 0) == WAIT_OBJECT_0;
#else
- #ifdef USE_SYSV_SEMAPHORES
- struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
- int ret = semop(Handle, ops, 1);
- #else
- int ret = sem_trywait(Handle);
- #endif
- return ret == 0;
+ #ifdef USE_SYSV_SEMAPHORES
+ struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
+ int ret = semop(Handle, ops, 1);
+ #else
+ int ret = sem_trywait(Handle);
+ #endif
+ return ret == 0;
#endif
- }
- };
+ }
+ };
-#if defined(_unix_)
- /*
+#if defined(_unix_)
+ /*
Disable errors/warnings about deprecated sem_* in Darwin
*/
- #ifdef _darwin_
- Y_PRAGMA_DIAGNOSTIC_PUSH
- Y_PRAGMA_NO_DEPRECATED
- #endif
- struct TPosixSemaphore {
- inline TPosixSemaphore(ui32 maxFreeCount) {
- if (sem_init(&S_, 0, maxFreeCount)) {
- ythrow TSystemError() << "can not init semaphore";
- }
- }
-
+ #ifdef _darwin_
+ Y_PRAGMA_DIAGNOSTIC_PUSH
+ Y_PRAGMA_NO_DEPRECATED
+ #endif
+ struct TPosixSemaphore {
+ inline TPosixSemaphore(ui32 maxFreeCount) {
+ if (sem_init(&S_, 0, maxFreeCount)) {
+ ythrow TSystemError() << "can not init semaphore";
+ }
+ }
+
inline ~TPosixSemaphore() {
Y_VERIFY(sem_destroy(&S_) == 0, "semaphore destroy failed");
- }
-
+ }
+
inline void Acquire() noexcept {
Y_VERIFY(sem_wait(&S_) == 0, "semaphore acquire failed");
- }
-
+ }
+
inline void Release() noexcept {
Y_VERIFY(sem_post(&S_) == 0, "semaphore release failed");
- }
-
+ }
+
inline bool TryAcquire() noexcept {
- if (sem_trywait(&S_)) {
+ if (sem_trywait(&S_)) {
Y_VERIFY(errno == EAGAIN, "semaphore try wait failed");
-
- return false;
- }
-
- return true;
- }
-
- sem_t S_;
- };
- #ifdef _darwin_
- Y_PRAGMA_DIAGNOSTIC_POP
- #endif
+
+ return false;
+ }
+
+ return true;
+ }
+
+ sem_t S_;
+ };
+ #ifdef _darwin_
+ Y_PRAGMA_DIAGNOSTIC_POP
+ #endif
#endif
+}
+
+class TSemaphore::TImpl: public TSemaphoreImpl {
+public:
+ inline TImpl(const char* name, ui32 maxFreeCount)
+ : TSemaphoreImpl(name, maxFreeCount)
+ {
+ }
+};
+
+TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
+ : Impl_(new TImpl(name, maxFreeCount))
+{
}
-
-class TSemaphore::TImpl: public TSemaphoreImpl {
-public:
- inline TImpl(const char* name, ui32 maxFreeCount)
- : TSemaphoreImpl(name, maxFreeCount)
- {
- }
-};
-
-TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
- : Impl_(new TImpl(name, maxFreeCount))
-{
-}
-
+
TSemaphore::~TSemaphore() = default;
-
+
void TSemaphore::Release() noexcept {
- Impl_->Release();
-}
-
+ Impl_->Release();
+}
+
void TSemaphore::Acquire() noexcept {
- Impl_->Acquire();
-}
-
+ Impl_->Acquire();
+}
+
bool TSemaphore::TryAcquire() noexcept {
- return Impl_->TryAcquire();
-}
-
+ return Impl_->TryAcquire();
+}
+
#if defined(_unix_) && !defined(_darwin_)
-class TFastSemaphore::TImpl: public TPosixSemaphore {
-public:
- inline TImpl(ui32 n)
- : TPosixSemaphore(n)
- {
- }
-};
-#else
+class TFastSemaphore::TImpl: public TPosixSemaphore {
+public:
+ inline TImpl(ui32 n)
+ : TPosixSemaphore(n)
+ {
+ }
+};
+#else
class TFastSemaphore::TImpl: public TString, public TSemaphoreImpl {
-public:
- inline TImpl(ui32 n)
+public:
+ inline TImpl(ui32 n)
: TString(ToString(RandomNumber<ui64>()))
, TSemaphoreImpl(c_str(), n)
- {
- }
-};
-#endif
-
-TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
- : Impl_(new TImpl(maxFreeCount))
-{
-}
-
+ {
+ }
+};
+#endif
+
+TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
+ : Impl_(new TImpl(maxFreeCount))
+{
+}
+
TFastSemaphore::~TFastSemaphore() = default;
-
+
void TFastSemaphore::Release() noexcept {
- Impl_->Release();
-}
-
+ Impl_->Release();
+}
+
void TFastSemaphore::Acquire() noexcept {
- Impl_->Acquire();
-}
-
+ Impl_->Acquire();
+}
+
bool TFastSemaphore::TryAcquire() noexcept {
- return Impl_->TryAcquire();
-}
+ return Impl_->TryAcquire();
+}