aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
commit5706cb392271ea40eab053314e7c0f4d9d4547ba (patch)
tree72bce210dc3747df1d9319fc9f56b848852d2aab
parent122a6055cef2bc785407c69b33b858a07b319e66 (diff)
downloadydb-5706cb392271ea40eab053314e7c0f4d9d4547ba.tar.gz
try to get rid of locks and allocations for elastic queue thread pool
In case of heavy load and high rps current thread pool implementation seems to have problems at least with contention on lock inside condvar (long futex wait calls from http server listener thread), so try to implement something more efficient: - replace condvar with TEventCounter implementation without internal lock (pthread condvar maintains waiters wakeup order, thread pool doesn't need it); - introduce well-known bounded mpmc queue over ring buffer; - get rid of TDecrementingWrapper; - add options to turn on new pool in library/cpp/http/server and search/daemons (will remove after adoption); - make elastic queue ut check both versions; - workaround problems with android/arm build targets.
-rw-r--r--library/cpp/http/server/http.cpp30
-rw-r--r--library/cpp/http/server/options.h6
-rw-r--r--library/cpp/threading/CMakeLists.txt1
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.txt17
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue.h89
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue_ut.cpp106
-rw-r--r--library/cpp/threading/bounded_queue/ut/ya.make8
-rw-r--r--library/cpp/threading/bounded_queue/ya.make9
-rw-r--r--library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp79
-rw-r--r--library/cpp/threading/equeue/fast.h167
-rw-r--r--library/cpp/threading/equeue/ya.make9
-rw-r--r--library/cpp/yt/CMakeLists.darwin-x86_64.txt21
-rw-r--r--library/cpp/yt/CMakeLists.linux-aarch64.txt18
-rw-r--r--library/cpp/yt/CMakeLists.linux-x86_64.txt21
-rw-r--r--library/cpp/yt/CMakeLists.txt19
-rw-r--r--library/cpp/yt/CMakeLists.windows-x86_64.txt21
-rw-r--r--library/cpp/yt/cpu_clock/CMakeLists.darwin-x86_64.txt21
-rw-r--r--library/cpp/yt/cpu_clock/CMakeLists.linux-x86_64.txt22
-rw-r--r--library/cpp/yt/cpu_clock/CMakeLists.txt15
-rw-r--r--library/cpp/yt/cpu_clock/CMakeLists.windows-x86_64.txt18
-rw-r--r--library/cpp/yt/cpu_clock/benchmark/benchmark.cpp41
-rw-r--r--library/cpp/yt/cpu_clock/benchmark/ya.make11
-rw-r--r--library/cpp/yt/cpu_clock/unittests/clock_ut.cpp46
-rw-r--r--library/cpp/yt/cpu_clock/unittests/ya.make13
-rw-r--r--library/cpp/yt/system/CMakeLists.darwin-x86_64.txt20
-rw-r--r--library/cpp/yt/system/CMakeLists.linux-x86_64.txt21
-rw-r--r--library/cpp/yt/system/CMakeLists.txt15
-rw-r--r--library/cpp/yt/system/CMakeLists.windows-x86_64.txt17
-rw-r--r--library/cpp/yt/threading/CMakeLists.darwin-x86_64.txt37
-rw-r--r--library/cpp/yt/threading/CMakeLists.linux-x86_64.txt38
-rw-r--r--library/cpp/yt/threading/CMakeLists.txt15
-rw-r--r--library/cpp/yt/threading/CMakeLists.windows-x86_64.txt34
-rw-r--r--library/cpp/yt/threading/unittests/count_down_latch_ut.cpp78
-rw-r--r--library/cpp/yt/threading/unittests/recursive_spin_lock_ut.cpp88
-rw-r--r--library/cpp/yt/threading/unittests/spin_wait_ut.cpp48
-rw-r--r--library/cpp/yt/threading/unittests/ya.make17
44 files changed, 1261 insertions, 40 deletions
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 2dd407dceff..ecdd93ab56e 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -2,6 +2,7 @@
#include "http_ex.h"
#include <library/cpp/threading/equeue/equeue.h>
+#include <library/cpp/threading/equeue/fast.h>
#include <util/generic/buffer.h>
#include <util/generic/intrlist.h>
@@ -405,8 +406,8 @@ public:
: TImpl(
parent,
cb,
- MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName),
- MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName),
+ MakeThreadPool<TSimpleThreadPool>(factory, options, cb, options.RequestsThreadName),
+ MakeThreadPool<TThreadPool>(factory, options, nullptr, options.FailRequestsThreadName),
options) {
}
@@ -456,21 +457,30 @@ public:
private:
template <class TThreadPool_>
- static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, bool elastic, ICallBack* callback = nullptr, const TString& threadName = {}) {
+ static THolder<IThreadPool> MakeThreadPool(ICallBack* callback, const IThreadPool::TParams& params) {
+ if (callback) {
+ return MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
+ } else {
+ return MakeHolder<TThreadPool_>(params);
+ }
+ }
+
+ template <class TThreadPool_>
+ static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, const TOptions& options, ICallBack* callback = nullptr, const TString& threadName = {}) {
if (!factory) {
factory = SystemThreadFactory();
}
THolder<IThreadPool> pool;
const auto params = IThreadPool::TParams().SetFactory(factory).SetThreadName(threadName);
- if (callback) {
- pool = MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
- } else {
- pool = MakeHolder<TThreadPool_>(params);
- }
- if (elastic) {
- pool = MakeHolder<TElasticQueue>(std::move(pool));
+ if (options.UseFastElasticQueues) {
+ pool = MakeThreadPool<TFastElasticQueue>(callback, params);
+ } else {
+ pool = MakeThreadPool<TThreadPool_>(callback, params);
+ if (options.UseElasticQueues) {
+ pool = MakeHolder<TElasticQueue>(std::move(pool));
+ }
}
return pool;
diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h
index 5976d58f32f..4656dd7082e 100644
--- a/library/cpp/http/server/options.h
+++ b/library/cpp/http/server/options.h
@@ -131,6 +131,11 @@ public:
return *this;
}
+ inline THttpServerOptions& EnableFastElasticQueues(bool enable) noexcept {
+ UseFastElasticQueues = enable;
+
+ return *this;
+ }
inline THttpServerOptions& SetThreadsName(const TString& listenThreadName, const TString& requestsThreadName, const TString& failRequestsThreadName) noexcept {
ListenThreadName = listenThreadName;
RequestsThreadName = requestsThreadName;
@@ -167,6 +172,7 @@ public:
ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB;
size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed
bool UseElasticQueues = false;
+ bool UseFastElasticQueues = false;
TDuration PollTimeout; // timeout of TSocketPoller::WaitT call
TDuration ExpirationTimeout; // drop inactive connections after ExpirationTimeout (should be > 0)
diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt
index 681ef6b24e3..f5410422081 100644
--- a/library/cpp/threading/CMakeLists.txt
+++ b/library/cpp/threading/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(atomic)
+add_subdirectory(bounded_queue)
add_subdirectory(chunk_queue)
add_subdirectory(equeue)
add_subdirectory(future)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..bd9b68ab62b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..613a18db103
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..613a18db103
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.txt b/library/cpp/threading/bounded_queue/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..bd9b68ab62b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h
new file mode 100644
index 00000000000..c5c67140865
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
+
+namespace NThreading {
+ template<typename T>
+ class TBoundedQueue {
+ public:
+ explicit TBoundedQueue(size_t size)
+ : Buffer_(new TCell[size])
+ , Mask_(size - 1)
+ {
+ Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
+
+ for (size_t i = 0; i < size; ++i) {
+ Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
+ }
+ }
+
+ template <typename T_>
+ [[nodiscard]] bool Enqueue(T_&& data) noexcept {
+ TCell* cell;
+ size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)pos;
+
+ if (dif == 0) {
+ if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = EnqueuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(cell->Data = std::forward<T_>(data)));
+ cell->Data = std::forward<T_>(data);
+ cell->Sequence.store(pos + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ [[nodiscard]] bool Dequeue(T& data) noexcept {
+ TCell* cell;
+ size_t pos = DequeuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+
+ if (dif == 0) {
+ if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = DequeuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(data = std::move(cell->Data)));
+ data = std::move(cell->Data);
+
+ cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
+ return true;
+ }
+ private:
+ struct TCell {
+ std::atomic<size_t> Sequence;
+ T Data;
+ };
+
+ std::unique_ptr<TCell[]> Buffer_;
+ const size_t Mask_;
+
+ alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
+ alignas(64) std::atomic<size_t> DequeuePos_ = 0;
+ };
+}
+
diff --git a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
new file mode 100644
index 00000000000..bb5b6eb787b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
@@ -0,0 +1,106 @@
+#include "bounded_queue.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/thread/factory.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
+ Y_UNIT_TEST(QueueSize) {
+ const size_t queueSize = 16;
+ TBoundedQueue<size_t> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ UNIT_ASSERT(boundedQueue.Enqueue(i));
+ }
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ size_t tmp = 0;
+ UNIT_ASSERT(boundedQueue.Dequeue(tmp));
+ UNIT_ASSERT(boundedQueue.Enqueue(0));
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ }
+
+ Y_UNIT_TEST(Move) {
+ const size_t queueSize = 16;
+ TBoundedQueue<TString> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ TString v = "xxx";
+ UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v.empty());
+ }
+
+ {
+ TString v = "xxx";
+ UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ TString v;
+ UNIT_ASSERT(boundedQueue.Dequeue(v));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ Y_UNIT_TEST(MPMC) {
+ size_t queueSize = 16;
+ size_t producers = 10;
+ size_t consumers = 10;
+ size_t itemsCount = 10000;
+
+ TVector<THolder<IThreadFactory::IThread>> threads;
+ TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);
+
+ std::atomic<size_t> itemCounter = 0;
+ std::atomic<size_t> consumed = 0;
+
+ for (size_t i = 0; i < consumers; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&]() {
+ TVector<size_t> prevItems(producers);
+ for (;;) {
+ std::pair<size_t, size_t> item;
+ while (!boundedQueue.Dequeue(item)) {
+ ;
+ }
+
+ if (item.first >= producers) {
+ break;
+ }
+
+ UNIT_ASSERT(item.second > prevItems[item.first]);
+ prevItems[item.first] = item.second;
+ ++consumed;
+ }
+ })
+ );
+ }
+
+ for (size_t i = 0; i < producers ; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&, producerNum = i]() {
+ for (;;) {
+ size_t item = ++itemCounter;
+ if (item > itemsCount) {
+ break;
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
+ ;
+ }
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
+ ;
+ }
+ })
+ );
+ }
+
+
+ for (auto& t : threads) {
+ t->Join();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
+ }
+}
diff --git a/library/cpp/threading/bounded_queue/ut/ya.make b/library/cpp/threading/bounded_queue/ut/ya.make
new file mode 100644
index 00000000000..5db0a227131
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ut/ya.make
@@ -0,0 +1,8 @@
+UNITTEST_FOR(library/cpp/threading/bounded_queue)
+
+SRCS(
+ bounded_queue_ut.cpp
+)
+
+END()
+
diff --git a/library/cpp/threading/bounded_queue/ya.make b/library/cpp/threading/bounded_queue/ya.make
new file mode 100644
index 00000000000..4c5336f0311
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+SRCS(
+ bounded_queue.h
+)
+
+END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
index 902b6d7a9ac..43ff0adc1c0 100644
--- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
index 5653d3d3f3f..805b2379439 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
index 5653d3d3f3f..36f8a2632e3 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
index 902b6d7a9ac..43ff0adc1c0 100644
--- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 8557f63ac06..2c7d2c7b1e8 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,4 +1,5 @@
#include "equeue.h"
+#include "fast.h"
#include <library/cpp/testing/unittest/registar.h>
@@ -9,18 +10,33 @@
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
- const size_t N = 100000;
- static THolder<TElasticQueue> Queue;
+ template <typename T>
+ THolder<T> MakeQueue();
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
+ template <>
+ THolder<TElasticQueue> MakeQueue() {
+ return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
+ }
+
+ template <>
+ THolder<TFastElasticQueue> MakeQueue() {
+ return MakeHolder<TFastElasticQueue>();
+ }
+
+ template <typename T>
+ struct TEnv {
+ static inline THolder<T> Queue;
+
+ struct TQueueSetup {
+ TQueueSetup() {
+ Queue.Reset(MakeQueue<T>());
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
+ };
};
struct TCounters {
@@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
- Y_UNIT_TEST(FillTest) {
+
+ template <typename T>
+ void FillTest() {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
} job;
- struct TLocalSetup: TQueueSetup {
+ struct TLocalSetup: TEnv<T>::TQueueSetup {
+ TLocalSetup() {
+ WaitEvent.Reset();
+ }
~TLocalSetup() {
WaitEvent.Signal();
}
@@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
size_t enqueued = 0;
{
TLocalSetup setup;
- while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
}
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
+ Y_UNIT_TEST(FillTest) {
+ FillTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(FillTestFast) {
+ FillTest<TFastElasticQueue>();
+ }
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
@@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
};
static TJob Job;
+ template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
- if (Queue->Add(&Job)) {
+ if (TEnv<T>::Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
@@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
+ const size_t N = 100000;
static size_t TryCounter;
- Y_UNIT_TEST(ConcurrentTest) {
+ template <typename T>
+ void ConcurrentTest() {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd()) {
+ if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
}
@@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
} sender;
{
- TQueueSetup setup;
+ typename TEnv<T>::TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
@@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
+
+ Y_UNIT_TEST(ConcurrentTest) {
+ ConcurrentTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(ConcurrentTestFast) {
+ ConcurrentTest<TFastElasticQueue>();
+ }
}
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h
new file mode 100644
index 00000000000..3f96e279fc9
--- /dev/null
+++ b/library/cpp/threading/equeue/fast.h
@@ -0,0 +1,167 @@
+#pragma once
+
+#include <util/thread/pool.h>
+
+#include <util/datetime/base.h>
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+#include <util/generic/vector.h>
+#include <util/generic/scope.h>
+#include <util/stream/str.h>
+
+#include <library/cpp/threading/bounded_queue/bounded_queue.h>
+
+#if defined(_android_) || defined(_arm_)
+//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue
+#include "equeue.h"
+ class TFastElasticQueue
+ : public TElasticQueue
+ {
+ public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TElasticQueue(MakeHolder<TSimpleThreadPool>(params))
+ {
+ }
+ };
+#else
+
+#include <library/cpp/yt/threading/event_count.h>
+
+class TFastElasticQueue
+ : public TThreadPoolBase
+ , private IThreadFactory::IThreadAble
+{
+public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TThreadPoolBase(params)
+ {
+ Y_ENSURE(!params.Blocking_);
+ }
+
+ ~TFastElasticQueue() {
+ Stop();
+ }
+
+ void Start(size_t threadCount, size_t maxQueueSize) override {
+ Y_ENSURE(Threads_.empty());
+ Y_ENSURE(maxQueueSize > 0);
+
+ Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
+ MaxQueueSize_ = maxQueueSize;
+
+ try {
+ for (size_t i = 0; i < threadCount; ++i) {
+ Threads_.push_back(Pool()->Run(this));
+ }
+ } catch (...) {
+ Stop();
+ throw;
+ }
+
+ Stopped_ = false;
+ }
+
+ size_t ObjectCount() const {
+ //GuardCount_ can be temporary incremented above real object count in queue
+ return Min(GuardCount_.load(), MaxQueueSize_);
+ }
+
+ bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
+ if (Stopped_ || !obj) {
+ return false;
+ }
+
+ if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
+ GuardCount_.fetch_sub(1);
+ return false;
+ }
+
+ QueueSize_.fetch_add(1);
+
+ if (!Queue_->Enqueue(obj)) {
+ //Simultaneous Dequeue calls can return not in exact fifo order of items,
+ //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
+ //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
+ GuardCount_.fetch_sub(1);
+ QueueSize_.fetch_sub(1);
+ return false;
+ }
+
+
+ Event_.NotifyOne();
+
+ return true;
+ }
+
+ size_t Size() const noexcept override {
+ return QueueSize_.load();
+ }
+
+ void Stop() noexcept override {
+ Stopped_ = true;
+
+ for (size_t i = 0; i < Threads_.size(); ++i) {
+ while (!Queue_->Enqueue(nullptr)) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ Event_.NotifyOne();
+ }
+
+ while (!Threads_.empty()) {
+ Threads_.back()->Join();
+ Threads_.pop_back();
+ }
+
+ Queue_.Reset();
+ }
+
+ void DoExecute() override {
+ TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
+
+ while (true) {
+ IObjectInQueue* job = nullptr;
+
+ Event_.Await([&]() {
+ return Queue_->Dequeue(job);
+ });
+
+ if (!job) {
+ break;
+ }
+
+ QueueSize_.fetch_sub(1);
+
+ Y_DEFER {
+ GuardCount_.fetch_sub(1);
+ };
+
+ if (Params.Catching_) {
+ try {
+ try {
+ job->Process(nullptr);
+ } catch (...) {
+ Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
+ }
+ } catch (...) {
+ ;
+ }
+ } else {
+ job->Process(nullptr);
+ }
+ }
+ }
+private:
+ std::atomic<bool> Stopped_ = false;
+ size_t MaxQueueSize_ = 0;
+
+ alignas(64) std::atomic<size_t> GuardCount_ = 0;
+ alignas(64) std::atomic<size_t> QueueSize_ = 0;
+
+ TVector<THolder<IThreadFactory::IThread>> Threads_;
+
+ THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
+ NYT::NThreading::TEventCount Event_;
+};
+
+#endif
diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make
index 445797aa121..95677366c93 100644
--- a/library/cpp/threading/equeue/ya.make
+++ b/library/cpp/threading/equeue/ya.make
@@ -3,14 +3,23 @@ LIBRARY()
SRCS(
equeue.h
equeue.cpp
+ fast.h
)
PEERDIR(
library/cpp/deprecated/atomic
+ library/cpp/threading/bounded_queue
)
+IF (NOT OS_ANDROID AND NOT ARCH_ARM)
+ PEERDIR(
+ library/cpp/yt/threading
+ )
+ENDIF()
+
END()
RECURSE_FOR_TESTS(
ut
)
+
diff --git a/library/cpp/yt/CMakeLists.darwin-x86_64.txt b/library/cpp/yt/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..eb757e83e8b
--- /dev/null
+++ b/library/cpp/yt/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(assert)
+add_subdirectory(coding)
+add_subdirectory(cpu_clock)
+add_subdirectory(exception)
+add_subdirectory(malloc)
+add_subdirectory(memory)
+add_subdirectory(misc)
+add_subdirectory(small_containers)
+add_subdirectory(string)
+add_subdirectory(system)
+add_subdirectory(threading)
+add_subdirectory(yson)
+add_subdirectory(yson_string)
diff --git a/library/cpp/yt/CMakeLists.linux-aarch64.txt b/library/cpp/yt/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..eb0165800fd
--- /dev/null
+++ b/library/cpp/yt/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(assert)
+add_subdirectory(coding)
+add_subdirectory(exception)
+add_subdirectory(malloc)
+add_subdirectory(memory)
+add_subdirectory(misc)
+add_subdirectory(small_containers)
+add_subdirectory(string)
+add_subdirectory(yson)
+add_subdirectory(yson_string)
diff --git a/library/cpp/yt/CMakeLists.linux-x86_64.txt b/library/cpp/yt/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..eb757e83e8b
--- /dev/null
+++ b/library/cpp/yt/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(assert)
+add_subdirectory(coding)
+add_subdirectory(cpu_clock)
+add_subdirectory(exception)
+add_subdirectory(malloc)
+add_subdirectory(memory)
+add_subdirectory(misc)
+add_subdirectory(small_containers)
+add_subdirectory(string)
+add_subdirectory(system)
+add_subdirectory(threading)
+add_subdirectory(yson)
+add_subdirectory(yson_string)
diff --git a/library/cpp/yt/CMakeLists.txt b/library/cpp/yt/CMakeLists.txt
index eb0165800fd..f8b31df0c11 100644
--- a/library/cpp/yt/CMakeLists.txt
+++ b/library/cpp/yt/CMakeLists.txt
@@ -6,13 +6,12 @@
# original buildsystem will not be accepted.
-add_subdirectory(assert)
-add_subdirectory(coding)
-add_subdirectory(exception)
-add_subdirectory(malloc)
-add_subdirectory(memory)
-add_subdirectory(misc)
-add_subdirectory(small_containers)
-add_subdirectory(string)
-add_subdirectory(yson)
-add_subdirectory(yson_string)
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/yt/CMakeLists.windows-x86_64.txt b/library/cpp/yt/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..eb757e83e8b
--- /dev/null
+++ b/library/cpp/yt/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(assert)
+add_subdirectory(coding)
+add_subdirectory(cpu_clock)
+add_subdirectory(exception)
+add_subdirectory(malloc)
+add_subdirectory(memory)
+add_subdirectory(misc)
+add_subdirectory(small_containers)
+add_subdirectory(string)
+add_subdirectory(system)
+add_subdirectory(threading)
+add_subdirectory(yson)
+add_subdirectory(yson_string)
diff --git a/library/cpp/yt/cpu_clock/CMakeLists.darwin-x86_64.txt b/library/cpp/yt/cpu_clock/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..b9afea23f77
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-cpu_clock)
+target_compile_options(cpp-yt-cpu_clock PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-cpu_clock PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+)
+target_sources(cpp-yt-cpu_clock PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/cpu_clock/clock.cpp
+)
diff --git a/library/cpp/yt/cpu_clock/CMakeLists.linux-x86_64.txt b/library/cpp/yt/cpu_clock/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..89fe774bc01
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-cpu_clock)
+target_compile_options(cpp-yt-cpu_clock PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-cpu_clock PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+)
+target_sources(cpp-yt-cpu_clock PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/cpu_clock/clock.cpp
+)
diff --git a/library/cpp/yt/cpu_clock/CMakeLists.txt b/library/cpp/yt/cpu_clock/CMakeLists.txt
new file mode 100644
index 00000000000..3bc235519c6
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/yt/cpu_clock/CMakeLists.windows-x86_64.txt b/library/cpp/yt/cpu_clock/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..cbe906d57f1
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-cpu_clock)
+target_link_libraries(cpp-yt-cpu_clock PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+)
+target_sources(cpp-yt-cpu_clock PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/cpu_clock/clock.cpp
+)
diff --git a/library/cpp/yt/cpu_clock/benchmark/benchmark.cpp b/library/cpp/yt/cpu_clock/benchmark/benchmark.cpp
new file mode 100644
index 00000000000..9d300b6726e
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/benchmark/benchmark.cpp
@@ -0,0 +1,41 @@
+#include "benchmark/benchmark.h"
+#include <benchmark/benchmark.h>
+
+#include <library/cpp/yt/cpu_clock/clock.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void BM_GetCpuInstant(benchmark::State& state)
+{
+ for (auto _ : state) {
+ benchmark::DoNotOptimize(GetCpuInstant());
+ }
+}
+
+BENCHMARK(BM_GetCpuInstant);
+
+void BM_GetCpuApproximateInstant(benchmark::State& state)
+{
+ for (auto _ : state) {
+ benchmark::DoNotOptimize(GetApproximateCpuInstant());
+ }
+}
+
+BENCHMARK(BM_GetCpuApproximateInstant);
+
+void BM_InstantNow(benchmark::State& state)
+{
+ for (auto _ : state) {
+ benchmark::DoNotOptimize(TInstant::Now());
+ }
+}
+
+BENCHMARK(BM_InstantNow);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/cpu_clock/benchmark/ya.make b/library/cpp/yt/cpu_clock/benchmark/ya.make
new file mode 100644
index 00000000000..4550bf5934e
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/benchmark/ya.make
@@ -0,0 +1,11 @@
+G_BENCHMARK()
+
+SRCS(
+ benchmark.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/cpu_clock
+)
+
+END()
diff --git a/library/cpp/yt/cpu_clock/unittests/clock_ut.cpp b/library/cpp/yt/cpu_clock/unittests/clock_ut.cpp
new file mode 100644
index 00000000000..bd9cb6d4be1
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/unittests/clock_ut.cpp
@@ -0,0 +1,46 @@
+#include <gtest/gtest.h>
+
+#include <library/cpp/yt/cpu_clock/clock.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+i64 DiffMS(T a, T b)
+{
+ return a >= b
+ ? static_cast<i64>(a.MilliSeconds()) - static_cast<i64>(b.MilliSeconds())
+ : DiffMS(b, a);
+}
+
+TEST(TTimingTest, GetInstant)
+{
+ GetInstant();
+
+ EXPECT_LE(DiffMS(GetInstant(), TInstant::Now()), 10);
+}
+
+TEST(TTimingTest, InstantVSCpuInstant)
+{
+ auto instant1 = TInstant::Now();
+ auto cpuInstant = InstantToCpuInstant(instant1);
+ auto instant2 = CpuInstantToInstant(cpuInstant);
+ EXPECT_LE(DiffMS(instant1, instant2), 10);
+}
+
+TEST(TTimingTest, DurationVSCpuDuration)
+{
+ auto cpuInstant1 = GetCpuInstant();
+ constexpr auto duration1 = TDuration::MilliSeconds(100);
+ Sleep(duration1);
+ auto cpuInstant2 = GetCpuInstant();
+ auto duration2 = CpuDurationToDuration(cpuInstant2 - cpuInstant1);
+ EXPECT_LE(DiffMS(duration1, duration2), 10);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/cpu_clock/unittests/ya.make b/library/cpp/yt/cpu_clock/unittests/ya.make
new file mode 100644
index 00000000000..921087c295b
--- /dev/null
+++ b/library/cpp/yt/cpu_clock/unittests/ya.make
@@ -0,0 +1,13 @@
+GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
+
+SRCS(
+ clock_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/cpu_clock
+)
+
+END()
diff --git a/library/cpp/yt/system/CMakeLists.darwin-x86_64.txt b/library/cpp/yt/system/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..ad24a0da294
--- /dev/null
+++ b/library/cpp/yt/system/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-system)
+target_compile_options(cpp-yt-system PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-system PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+)
+target_sources(cpp-yt-system PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/system/thread_id.cpp
+)
diff --git a/library/cpp/yt/system/CMakeLists.linux-x86_64.txt b/library/cpp/yt/system/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..6dc2a074994
--- /dev/null
+++ b/library/cpp/yt/system/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-system)
+target_compile_options(cpp-yt-system PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-system PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
+target_sources(cpp-yt-system PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/system/thread_id.cpp
+)
diff --git a/library/cpp/yt/system/CMakeLists.txt b/library/cpp/yt/system/CMakeLists.txt
new file mode 100644
index 00000000000..3bc235519c6
--- /dev/null
+++ b/library/cpp/yt/system/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/yt/system/CMakeLists.windows-x86_64.txt b/library/cpp/yt/system/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..338956fa705
--- /dev/null
+++ b/library/cpp/yt/system/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-system)
+target_link_libraries(cpp-yt-system PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+)
+target_sources(cpp-yt-system PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/system/thread_id.cpp
+)
diff --git a/library/cpp/yt/threading/CMakeLists.darwin-x86_64.txt b/library/cpp/yt/threading/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..7dbacb9da4c
--- /dev/null
+++ b/library/cpp/yt/threading/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,37 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-threading)
+target_compile_options(cpp-yt-threading PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-threading PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+ cpp-yt-cpu_clock
+ cpp-yt-system
+ cpp-yt-memory
+)
+target_sources(cpp-yt-threading PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/at_fork.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/count_down_latch.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/event_count.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/futex.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/notification_handle.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/public.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/recursive_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock_base.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait_hook.cpp
+)
diff --git a/library/cpp/yt/threading/CMakeLists.linux-x86_64.txt b/library/cpp/yt/threading/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..644ee262f04
--- /dev/null
+++ b/library/cpp/yt/threading/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-threading)
+target_compile_options(cpp-yt-threading PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-yt-threading PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+ cpp-yt-cpu_clock
+ cpp-yt-system
+ cpp-yt-memory
+)
+target_sources(cpp-yt-threading PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/at_fork.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/count_down_latch.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/event_count.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/futex.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/notification_handle.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/public.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/recursive_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock_base.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait_hook.cpp
+)
diff --git a/library/cpp/yt/threading/CMakeLists.txt b/library/cpp/yt/threading/CMakeLists.txt
new file mode 100644
index 00000000000..3bc235519c6
--- /dev/null
+++ b/library/cpp/yt/threading/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/yt/threading/CMakeLists.windows-x86_64.txt b/library/cpp/yt/threading/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..eedf3ebee2f
--- /dev/null
+++ b/library/cpp/yt/threading/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-yt-threading)
+target_link_libraries(cpp-yt-threading PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-yt-assert
+ cpp-yt-cpu_clock
+ cpp-yt-system
+ cpp-yt-memory
+)
+target_sources(cpp-yt-threading PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/at_fork.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/count_down_latch.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/event_count.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/fork_aware_rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/futex.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/notification_handle.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/public.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/recursive_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/rw_spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock_base.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_lock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/threading/spin_wait_hook.cpp
+)
diff --git a/library/cpp/yt/threading/unittests/count_down_latch_ut.cpp b/library/cpp/yt/threading/unittests/count_down_latch_ut.cpp
new file mode 100644
index 00000000000..894bdab22a7
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/count_down_latch_ut.cpp
@@ -0,0 +1,78 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/count_down_latch.h>
+
+#include <thread>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void WaitForLatch(const TCountDownLatch& latch)
+{
+ latch.Wait();
+ EXPECT_EQ(0, latch.GetCount());
+}
+
+TEST(TCountDownLatch, TwoThreads)
+{
+ TCountDownLatch latch(2);
+
+ std::thread t1(std::bind(&WaitForLatch, std::cref(latch)));
+ std::thread t2(std::bind(&WaitForLatch, std::cref(latch)));
+
+ EXPECT_EQ(2, latch.GetCount());
+ latch.CountDown();
+ EXPECT_EQ(1, latch.GetCount());
+ latch.CountDown();
+ EXPECT_EQ(0, latch.GetCount());
+
+ t1.join();
+ t2.join();
+}
+
+TEST(TCountDownLatch, TwoThreadsPredecremented)
+{
+ TCountDownLatch latch(2);
+
+ EXPECT_EQ(2, latch.GetCount());
+ latch.CountDown();
+ EXPECT_EQ(1, latch.GetCount());
+ latch.CountDown();
+ EXPECT_EQ(0, latch.GetCount());
+
+ std::thread t1(std::bind(&WaitForLatch, std::cref(latch)));
+ std::thread t2(std::bind(&WaitForLatch, std::cref(latch)));
+
+ t1.join();
+ t2.join();
+}
+
+TEST(TCountDownLatch, TwoThreadsTwoLatches)
+{
+ TCountDownLatch first(1);
+ TCountDownLatch second(1);
+
+ std::thread t1([&] () {
+ first.Wait();
+ second.CountDown();
+ EXPECT_EQ(0, first.GetCount());
+ EXPECT_EQ(0, second.GetCount());
+ });
+
+ std::thread t2([&] () {
+ first.CountDown();
+ second.Wait();
+ EXPECT_EQ(0, first.GetCount());
+ EXPECT_EQ(0, second.GetCount());
+ });
+
+ t1.join();
+ t2.join();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/unittests/recursive_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/recursive_spin_lock_ut.cpp
new file mode 100644
index 00000000000..9c2d8f16cbf
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/recursive_spin_lock_ut.cpp
@@ -0,0 +1,88 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/recursive_spin_lock.h>
+#include <library/cpp/yt/threading/event_count.h>
+
+#include <thread>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TRecursiveSpinLockTest, SingleThread)
+{
+ TRecursiveSpinLock lock;
+ EXPECT_FALSE(lock.IsLocked());
+ EXPECT_TRUE(lock.TryAcquire());
+ EXPECT_TRUE(lock.IsLocked());
+ EXPECT_TRUE(lock.TryAcquire());
+ EXPECT_TRUE(lock.IsLocked());
+ lock.Release();
+ EXPECT_TRUE(lock.IsLocked());
+ lock.Release();
+ EXPECT_FALSE(lock.IsLocked());
+ EXPECT_TRUE(lock.TryAcquire());
+ EXPECT_TRUE(lock.IsLocked());
+ lock.Release();
+ lock.Acquire();
+ lock.Release();
+}
+
+TEST(TRecursiveSpinLockTest, TwoThreads)
+{
+ TRecursiveSpinLock lock;
+ TEvent e1, e2, e3, e4, e5, e6, e7;
+
+ std::thread t1([&] {
+ e1.Wait();
+ EXPECT_TRUE(lock.IsLocked());
+ EXPECT_FALSE(lock.IsLockedByCurrentThread());
+ EXPECT_FALSE(lock.TryAcquire());
+ e2.NotifyOne();
+ e3.Wait();
+ EXPECT_TRUE(lock.IsLocked());
+ EXPECT_FALSE(lock.IsLockedByCurrentThread());
+ EXPECT_FALSE(lock.TryAcquire());
+ e4.NotifyOne();
+ e5.Wait();
+ EXPECT_FALSE(lock.IsLocked());
+ EXPECT_FALSE(lock.IsLockedByCurrentThread());
+ EXPECT_TRUE(lock.TryAcquire());
+ e6.NotifyOne();
+ e7.Wait();
+ lock.Release();
+ });
+
+ std::thread t2([&] {
+ EXPECT_FALSE(lock.IsLocked());
+ EXPECT_TRUE(lock.TryAcquire());
+ EXPECT_TRUE(lock.IsLockedByCurrentThread());
+ e1.NotifyOne();
+ e2.Wait();
+ EXPECT_TRUE(lock.TryAcquire());
+ EXPECT_TRUE(lock.IsLockedByCurrentThread());
+ e3.NotifyOne();
+ e4.Wait();
+ lock.Release();
+ lock.Release();
+ EXPECT_FALSE(lock.IsLocked());
+ e5.NotifyOne();
+ e6.Wait();
+ EXPECT_TRUE(lock.IsLocked());
+ EXPECT_FALSE(lock.IsLockedByCurrentThread());
+ e7.NotifyOne();
+ lock.Acquire();
+ lock.Acquire();
+ lock.Release();
+ lock.Release();
+ });
+
+ t1.join();
+ t2.join();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/unittests/spin_wait_ut.cpp b/library/cpp/yt/threading/unittests/spin_wait_ut.cpp
new file mode 100644
index 00000000000..8469634f346
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/spin_wait_ut.cpp
@@ -0,0 +1,48 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/spin_wait.h>
+#include <library/cpp/yt/threading/spin_wait_hook.h>
+
+#include <thread>
+#include <mutex>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool SpinWaitSlowPathHookInvoked;
+
+void SpinWaitSlowPathHook(
+ TCpuDuration cpuDelay,
+ const TSourceLocation& /*location*/,
+ ESpinLockActivityKind /*activityKind*/)
+{
+ SpinWaitSlowPathHookInvoked = true;
+ auto delay = CpuDurationToDuration(cpuDelay);
+ EXPECT_GE(delay, TDuration::Seconds(1));
+ EXPECT_LE(delay, TDuration::Seconds(5));
+}
+
+TEST(TSpinWaitTest, SlowPathHook)
+{
+ static std::once_flag registerFlag;
+ std::call_once(
+ registerFlag,
+ [] {
+ RegisterSpinWaitSlowPathHook(SpinWaitSlowPathHook);
+ });
+ SpinWaitSlowPathHookInvoked = false;
+ {
+ TSpinWait spinWait(__LOCATION__, ESpinLockActivityKind::ReadWrite);
+ for (int i = 0; i < 1'000'000; ++i) {
+ spinWait.Wait();
+ }
+ }
+ EXPECT_TRUE(SpinWaitSlowPathHookInvoked);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make
new file mode 100644
index 00000000000..ef9b5d29951
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/ya.make
@@ -0,0 +1,17 @@
+GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
+
+SRCS(
+ count_down_latch_ut.cpp
+ recursive_spin_lock_ut.cpp
+ spin_wait_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/assert
+ library/cpp/yt/threading
+ library/cpp/testing/gtest
+)
+
+END()