| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
 | #include "sem.h"
#ifdef _win_
    #include <malloc.h>
#elif defined(_sun)
    #include <alloca.h>
#endif
#include <cerrno>
#include <cstring>
#ifdef _win_
    #include "winint.h"
#else
    #include <signal.h>
    #include <unistd.h>
    #include <semaphore.h>
    #if defined(_bionic_) || defined(_darwin_) && defined(_arm_)
        #include <fcntl.h>
    #else
        #define USE_SYSV_SEMAPHORES //unixoids declared the standard but not implemented it...
    #endif
#endif
#ifdef USE_SYSV_SEMAPHORES
    #include <errno.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/sem.h>
    #if defined(_linux_) || defined(_sun_) || defined(_cygwin_)
union semun {
    int val;
    struct semid_ds* buf;
    unsigned short* array;
} arg;
    #else
union semun arg;
    #endif
#endif
#include <util/digest/city.h>
#include <util/string/cast.h>
#include <util/random/random.h>
#include <util/random/fast.h>
namespace {
    class TSemaphoreImpl {
    private:
#ifdef _win_
        using SEMHANDLE = HANDLE;
#else
    #ifdef USE_SYSV_SEMAPHORES
        using SEMHANDLE = int;
    #else
        using SEMHANDLE = sem_t*;
    #endif
#endif
        SEMHANDLE Handle;
    public:
        inline TSemaphoreImpl(const char* name, ui32 max_free_count)
            : Handle(0)
        {
#ifdef _win_
            char* key = (char*)name;
            if (name) {
                size_t len = strlen(name);
                key = (char*)alloca(len + 1);
                strcpy(key, name);
                if (len > MAX_PATH)
                    *(key + MAX_PATH) = 0;
                char* p = key;
                while (*p) {
                    if (*p == '\\')
                        *p = '/';
                    ++p;
                }
            }
            // non-blocking on init
            Handle = ::CreateSemaphore(0, max_free_count, max_free_count, key);
#else
    #ifdef USE_SYSV_SEMAPHORES
            key_t key = TPCGMixer::Mix(CityHash64(name, strlen(name))); //32 bit hash
            Handle = semget(key, 0, 0);                                 // try to open exist semaphore
            if (Handle == -1) {                                         // create new semaphore
                Handle = semget(key, 1, 0666 | IPC_CREAT);
                if (Handle != -1) {
                    union semun arg;
                    arg.val = max_free_count;
                    semctl(Handle, 0, SETVAL, arg);
                } else {
                    ythrow TSystemError() << "can not init sempahore";
                }
            }
    #else
            Handle = sem_open(name, O_CREAT, 0666, max_free_count);
            if (Handle == SEM_FAILED) {
                ythrow TSystemError() << "can not init sempahore";
            }
    #endif
#endif
        }
        inline ~TSemaphoreImpl() {
#ifdef _win_
            ::CloseHandle(Handle);
#else
    #ifdef USE_SYSV_SEMAPHORES
    // we DO NOT want 'semctl(Handle, 0, IPC_RMID)' for multiprocess tasks;
    //struct sembuf ops[] = {{0, 0, IPC_NOWAIT}};
    //if (semop(Handle, ops, 1) != 0) // close only if semaphore's value is zero
    //    semctl(Handle, 0, IPC_RMID);
    #else
            sem_close(Handle); // we DO NOT want sem_unlink(...)
    #endif
#endif
        }
        inline void Release() noexcept {
#ifdef _win_
            ::ReleaseSemaphore(Handle, 1, 0);
#else
    #ifdef USE_SYSV_SEMAPHORES
            struct sembuf ops[] = {{0, 1, SEM_UNDO}};
            int ret = semop(Handle, ops, 1);
    #else
            int ret = sem_post(Handle);
    #endif
            Y_VERIFY(ret == 0, "can not release semaphore");
#endif
        }
        //The UNIX semaphore object does not support a timed "wait", and
        //hence to maintain consistancy, for win32 case we use INFINITE or 0 timeout.
        inline void Acquire() noexcept {
#ifdef _win_
            Y_VERIFY(::WaitForSingleObject(Handle, INFINITE) == WAIT_OBJECT_0, "can not acquire semaphore");
#else
    #ifdef USE_SYSV_SEMAPHORES
            struct sembuf ops[] = {{0, -1, SEM_UNDO}};
            int ret = semop(Handle, ops, 1);
    #else
            int ret = sem_wait(Handle);
    #endif
            Y_VERIFY(ret == 0, "can not acquire semaphore");
#endif
        }
        inline bool TryAcquire() noexcept {
#ifdef _win_
            // zero-second time-out interval
            // WAIT_OBJECT_0: current free count > 0
            // WAIT_TIMEOUT:  current free count == 0
            return ::WaitForSingleObject(Handle, 0) == WAIT_OBJECT_0;
#else
    #ifdef USE_SYSV_SEMAPHORES
            struct sembuf ops[] = {{0, -1, SEM_UNDO | IPC_NOWAIT}};
            int ret = semop(Handle, ops, 1);
    #else
            int ret = sem_trywait(Handle);
    #endif
            return ret == 0;
#endif
        }
    };
#if defined(_unix_)
    /*
    Disable errors/warnings about deprecated sem_* in Darwin
*/
    #ifdef _darwin_
    Y_PRAGMA_DIAGNOSTIC_PUSH
    Y_PRAGMA_NO_DEPRECATED
    #endif
    struct TPosixSemaphore {
        inline TPosixSemaphore(ui32 maxFreeCount) {
            if (sem_init(&S_, 0, maxFreeCount)) {
                ythrow TSystemError() << "can not init semaphore";
            }
        }
        inline ~TPosixSemaphore() {
            Y_VERIFY(sem_destroy(&S_) == 0, "semaphore destroy failed");
        }
        inline void Acquire() noexcept {
            Y_VERIFY(sem_wait(&S_) == 0, "semaphore acquire failed");
        }
        inline void Release() noexcept {
            Y_VERIFY(sem_post(&S_) == 0, "semaphore release failed");
        }
        inline bool TryAcquire() noexcept {
            if (sem_trywait(&S_)) {
                Y_VERIFY(errno == EAGAIN, "semaphore try wait failed");
                return false;
            }
            return true;
        }
        sem_t S_;
    };
    #ifdef _darwin_
    Y_PRAGMA_DIAGNOSTIC_POP
    #endif
#endif
}
class TSemaphore::TImpl: public TSemaphoreImpl {
public:
    inline TImpl(const char* name, ui32 maxFreeCount)
        : TSemaphoreImpl(name, maxFreeCount)
    {
    }
};
TSemaphore::TSemaphore(const char* name, ui32 maxFreeCount)
    : Impl_(new TImpl(name, maxFreeCount))
{
}
TSemaphore::~TSemaphore() = default;
void TSemaphore::Release() noexcept {
    Impl_->Release();
}
void TSemaphore::Acquire() noexcept {
    Impl_->Acquire();
}
bool TSemaphore::TryAcquire() noexcept {
    return Impl_->TryAcquire();
}
#if defined(_unix_) && !defined(_darwin_)
class TFastSemaphore::TImpl: public TPosixSemaphore {
public:
    inline TImpl(ui32 n)
        : TPosixSemaphore(n)
    {
    }
};
#else
class TFastSemaphore::TImpl: public TString, public TSemaphoreImpl {
public:
    inline TImpl(ui32 n)
        : TString(ToString(RandomNumber<ui64>()))
        , TSemaphoreImpl(c_str(), n)
    {
    }
};
#endif
TFastSemaphore::TFastSemaphore(ui32 maxFreeCount)
    : Impl_(new TImpl(maxFreeCount))
{
}
TFastSemaphore::~TFastSemaphore() = default;
void TFastSemaphore::Release() noexcept {
    Impl_->Release();
}
void TFastSemaphore::Acquire() noexcept {
    Impl_->Acquire();
}
bool TFastSemaphore::TryAcquire() noexcept {
    return Impl_->TryAcquire();
}
 |