diff options
author | trofimenkov <trofimenkov@yandex-team.ru> | 2022-02-10 16:49:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:30 +0300 |
commit | 30cebc2cfa79af3b577760a113e203a79450e6b6 (patch) | |
tree | 49327bf3c28fab534b04b312a39179e70f7c2763 /util | |
parent | a2d2743094c8d255cda4011b317235874db4d01c (diff) | |
download | ydb-30cebc2cfa79af3b577760a113e203a79450e6b6.tar.gz |
Restoring authorship annotation for <trofimenkov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util')
-rw-r--r-- | util/generic/hash.h | 16 | ||||
-rw-r--r-- | util/generic/maybe.h | 36 | ||||
-rw-r--r-- | util/memory/tempbuf.cpp | 12 | ||||
-rw-r--r-- | util/memory/tempbuf_ut.cpp | 30 | ||||
-rw-r--r-- | util/system/mutex.cpp | 2 | ||||
-rw-r--r-- | util/thread/pool.cpp | 172 | ||||
-rw-r--r-- | util/thread/pool.h | 142 | ||||
-rw-r--r-- | util/thread/pool_ut.cpp | 176 | ||||
-rw-r--r-- | util/ysaveload_ut.cpp | 94 |
9 files changed, 340 insertions, 340 deletions
diff --git a/util/generic/hash.h b/util/generic/hash.h index e46db21fa9..e02b8aff18 100644 --- a/util/generic/hash.h +++ b/util/generic/hash.h @@ -1651,31 +1651,31 @@ public: const T& at(const TheKey& key) const { using namespace ::NPrivate; const_iterator it = find(key); - + if (Y_UNLIKELY(it == end())) { ::NPrivate::ThrowKeyNotFoundInHashTableException(MapKeyToString(key)); } return it->second; - } - + } + template <class TheKey> T& at(const TheKey& key) { using namespace ::NPrivate; iterator it = find(key); - + if (Y_UNLIKELY(it == end())) { ::NPrivate::ThrowKeyNotFoundInHashTableException(MapKeyToString(key)); } - + return it->second; - } - + } + template <class TKey> size_type count(const TKey& key) const { return rep.count(key); } - + template <class TKey> std::pair<iterator, iterator> equal_range(const TKey& key) { return rep.equal_range(key); diff --git a/util/generic/maybe.h b/util/generic/maybe.h index 34d21aebcd..0abcc302f8 100644 --- a/util/generic/maybe.h +++ b/util/generic/maybe.h @@ -7,7 +7,7 @@ #include <util/system/align.h> #include <util/stream/output.h> -#include <util/ysaveload.h> +#include <util/ysaveload.h> namespace NMaybe { struct TPolicyUndefinedExcept { @@ -387,31 +387,31 @@ public: } void Save(IOutputStream* out) const { - const bool defined = Defined(); + const bool defined = Defined(); - ::Save<bool>(out, defined); + ::Save<bool>(out, defined); - if (defined) { + if (defined) { ::Save(out, *Data()); - } - } - + } + } + void Load(IInputStream* in) { - bool defined; + bool defined; - ::Load(in, defined); + ::Load(in, defined); - if (defined) { - if (!Defined()) { - ConstructInPlace(); - } + if (defined) { + if (!Defined()) { + ConstructInPlace(); + } ::Load(in, *Data()); - } else { - Clear(); - } - } - + } else { + Clear(); + } + } + void Swap(TMaybe& other) { if (this->Defined_ == other.Defined_) { if (this->Defined_) { diff --git a/util/memory/tempbuf.cpp b/util/memory/tempbuf.cpp index 09a2d0f140..886c57a5a2 100644 --- a/util/memory/tempbuf.cpp +++ b/util/memory/tempbuf.cpp @@ -198,9 +198,9 @@ TTempBuf::TTempBuf(const TTempBuf&) noexcept = default; TTempBuf::TTempBuf(TTempBuf&& b) noexcept : Impl_(std::move(b.Impl_)) -{ -} - +{ +} + TTempBuf::~TTempBuf() = default; TTempBuf& TTempBuf::operator=(const TTempBuf& b) noexcept { @@ -271,9 +271,9 @@ void TTempBuf::Append(const void* data, size_t len) { } bool TTempBuf::IsNull() const noexcept { - return !Impl_; -} - + return !Impl_; +} + #if 0 #include <util/datetime/cputimer.h> diff --git a/util/memory/tempbuf_ut.cpp b/util/memory/tempbuf_ut.cpp index d6bcf9d546..ec3ebb168a 100644 --- a/util/memory/tempbuf_ut.cpp +++ b/util/memory/tempbuf_ut.cpp @@ -1,14 +1,14 @@ #include "tempbuf.h" #include <utility> - + #include <library/cpp/testing/unittest/registar.h> class TTempBufTest: public TTestBase { UNIT_TEST_SUITE(TTempBufTest); UNIT_TEST(TestCreate); UNIT_TEST(TestOps); - UNIT_TEST(TestMoveCtor); + UNIT_TEST(TestMoveCtor); UNIT_TEST(TestAppend); UNIT_TEST(TestProceed); UNIT_TEST_SUITE_END(); @@ -16,7 +16,7 @@ class TTempBufTest: public TTestBase { public: void TestCreate(); void TestOps(); - void TestMoveCtor(); + void TestMoveCtor(); void TestProceed(); void TestAppend() { @@ -64,19 +64,19 @@ void TTempBufTest::TestOps() { UNIT_ASSERT(tmp.Size() >= 201); UNIT_ASSERT_EQUAL(tmp.Filled(), 0); } - -void TTempBufTest::TestMoveCtor() { - TTempBuf src; - UNIT_ASSERT(!src.IsNull()); - - src.Proceed(10); - + +void TTempBufTest::TestMoveCtor() { + TTempBuf src; + UNIT_ASSERT(!src.IsNull()); + + src.Proceed(10); + TTempBuf dst(std::move(src)); - - UNIT_ASSERT(src.IsNull()); - UNIT_ASSERT(!dst.IsNull()); - UNIT_ASSERT_EQUAL(dst.Filled(), 10); -} + + UNIT_ASSERT(src.IsNull()); + UNIT_ASSERT(!dst.IsNull()); + UNIT_ASSERT_EQUAL(dst.Filled(), 10); +} void TTempBufTest::TestProceed() { TTempBuf src; diff --git a/util/system/mutex.cpp b/util/system/mutex.cpp index 4041402db9..d97adef5c2 100644 --- a/util/system/mutex.cpp +++ b/util/system/mutex.cpp @@ -126,7 +126,7 @@ TMutex::TMutex() } TMutex::TMutex(TMutex&&) = default; - + TMutex::~TMutex() = default; void TMutex::Acquire() noexcept { diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..2e2edf9488 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -14,52 +14,52 @@ #include <util/generic/fastqueue.h> #include <util/stream/output.h> -#include <util/string/builder.h> +#include <util/string/builder.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/atomic.h> #include <util/system/condvar.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/datetime/base.h> #include "factory.h" #include "pool.h" -namespace { - class TThreadNamer { - public: - TThreadNamer(const IThreadPool::TParams& params) - : ThreadName(params.ThreadName_) - , EnumerateThreads(params.EnumerateThreads_) - { - } - +namespace { + class TThreadNamer { + public: + TThreadNamer(const IThreadPool::TParams& params) + : ThreadName(params.ThreadName_) + , EnumerateThreads(params.EnumerateThreads_) + { + } + explicit operator bool() const { - return !ThreadName.empty(); - } - - void SetCurrentThreadName() { - if (EnumerateThreads) { - Set(TStringBuilder() << ThreadName << (Index++)); - } else { - Set(ThreadName); - } - } - - private: - void Set(const TString& name) { - TThread::SetCurrentThreadName(name.c_str()); - } - - private: - TString ThreadName; - bool EnumerateThreads = false; - std::atomic<ui64> Index{0}; - }; -} - + return !ThreadName.empty(); + } + + void SetCurrentThreadName() { + if (EnumerateThreads) { + Set(TStringBuilder() << ThreadName << (Index++)); + } else { + Set(ThreadName); + } + } + + private: + void Set(const TString& name) { + TThread::SetCurrentThreadName(name.c_str()); + } + + private: + TString ThreadName; + bool EnumerateThreads = false; + std::atomic<ui64> Index{0}; + }; +} + TThreadFactoryHolder::TThreadFactoryHolder() noexcept : Pool_(SystemThreadFactory()) { @@ -71,11 +71,11 @@ class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactor using TThreadRef = THolder<IThreadFactory::IThread>; public: - inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) + inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params) : Parent_(parent) - , Blocking(params.Blocking_) - , Catching(params.Catching_) - , Namer(params) + , Blocking(params.Blocking_) + , Catching(params.Catching_) + , Namer(params) , ShouldTerminate(1) , MaxQueueSize(0) , ThreadCountExpected(0) @@ -204,10 +204,10 @@ private: void DoExecute() override { THolder<TTsr> tsr(new TTsr(Parent_)); - if (Namer) { - Namer.SetCurrentThreadName(); - } - + if (Namer) { + Namer.SetCurrentThreadName(); + } + while (true) { IObjectInQueue* job = nullptr; @@ -227,18 +227,18 @@ private: QueuePopCond.Signal(); - if (Catching) { + if (Catching) { try { - try { - job->Process(*tsr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } + try { + job->Process(*tsr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } } catch (...) { // ¯\_(ツ)_/¯ } - } else { - job->Process(*tsr); + } else { + job->Process(*tsr); } } @@ -254,9 +254,9 @@ private: private: TThreadPool* Parent_; - const bool Blocking; - const bool Catching; - TThreadNamer Namer; + const bool Blocking; + const bool Catching; + TThreadNamer Namer; mutable TMutex QueueMutex; mutable TMutex StopMutex; TCondVar QueuePushCond; @@ -360,7 +360,7 @@ bool TThreadPool::Add(IObjectInQueue* obj) { } void TThreadPool::Start(size_t thrnum, size_t maxque) { - Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); + Impl_.Reset(new TImpl(this, thrnum, maxque, Params)); } void TThreadPool::Stop() noexcept { @@ -387,27 +387,27 @@ public: void DoExecute() noexcept override { THolder<TThread> This(this); - if (Impl_->Namer) { - Impl_->Namer.SetCurrentThreadName(); - } - + if (Impl_->Namer) { + Impl_->Namer.SetCurrentThreadName(); + } + { TTsr tsr(Impl_->Parent_); IObjectInQueue* obj; while ((obj = Impl_->WaitForJob()) != nullptr) { - if (Impl_->Catching) { + if (Impl_->Catching) { try { - try { - obj->Process(tsr); - } catch (...) { - Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; - } + try { + obj->Process(tsr); + } catch (...) { + Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl; + } } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } - } else { - obj->Process(tsr); + } else { + obj->Process(tsr); } } } @@ -418,10 +418,10 @@ public: THolder<IThreadFactory::IThread> Thread_; }; - inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) + inline TImpl(TAdaptiveThreadPool* parent, const TParams& params) : Parent_(parent) - , Catching(params.Catching_) - , Namer(params) + , Catching(params.Catching_) + , Namer(params) , ThrCount_(0) , AllDone_(false) , Obj_(nullptr) @@ -534,8 +534,8 @@ private: private: TAdaptiveThreadPool* Parent_; - const bool Catching; - TThreadNamer Namer; + const bool Catching; + TThreadNamer Namer; TAtomic ThrCount_; TMutex Mutex_; TCondVar CondReady_; @@ -547,22 +547,22 @@ private: TDuration IdleTime_; }; -TThreadPoolBase::TThreadPoolBase(const TParams& params) - : TThreadFactoryHolder(params.Factory_) - , Params(params) +TThreadPoolBase::TThreadPoolBase(const TParams& params) + : TThreadFactoryHolder(params.Factory_) + , Params(params) { } -#define DEFINE_THREAD_POOL_CTORS(type) \ +#define DEFINE_THREAD_POOL_CTORS(type) \ type::type(const TParams& params) \ : TThreadPoolBase(params) \ { \ } - -DEFINE_THREAD_POOL_CTORS(TThreadPool) -DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) -DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) - + +DEFINE_THREAD_POOL_CTORS(TThreadPool) +DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool) +DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool) + TAdaptiveThreadPool::~TAdaptiveThreadPool() = default; bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { @@ -574,7 +574,7 @@ bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) { } void TAdaptiveThreadPool::Start(size_t, size_t) { - Impl_.Reset(new TImpl(this, Params)); + Impl_.Reset(new TImpl(this, Params)); } void TAdaptiveThreadPool::Stop() noexcept { @@ -614,9 +614,9 @@ void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) { TAdaptiveThreadPool* adaptive(nullptr); if (thrnum) { - tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); + tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params)); } else { - adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); + adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params); tmp.Reset(adaptive); } @@ -760,10 +760,10 @@ IThread* IThreadPool::DoCreate() { return new TPoolThread(this); } -THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { +THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) { THolder<IThreadPool> queue; if (threadsCount > 1) { - queue.Reset(new TThreadPool(params)); + queue.Reset(new TThreadPool(params)); } else { queue.Reset(new TFakeThreadPool()); } diff --git a/util/thread/pool.h b/util/thread/pool.h index d1ea3a67cb..79e37050e4 100644 --- a/util/thread/pool.h +++ b/util/thread/pool.h @@ -80,66 +80,66 @@ IObjectInQueue* MakeThrFuncObj(T&& func) { return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func)); } -struct TThreadPoolParams { - bool Catching_ = true; - bool Blocking_ = false; - IThreadFactory* Factory_ = SystemThreadFactory(); - TString ThreadName_; - bool EnumerateThreads_ = false; - - using TSelf = TThreadPoolParams; - - TThreadPoolParams() { - } - - TThreadPoolParams(IThreadFactory* factory) - : Factory_(factory) - { - } - - TThreadPoolParams(const TString& name) { - SetThreadName(name); - } - - TThreadPoolParams(const char* name) { - SetThreadName(name); - } - - TSelf& SetCatching(bool val) { - Catching_ = val; - return *this; - } - - TSelf& SetBlocking(bool val) { - Blocking_ = val; - return *this; - } - - TSelf& SetFactory(IThreadFactory* factory) { - Factory_ = factory; - return *this; - } - - TSelf& SetThreadName(const TString& name) { - ThreadName_ = name; - EnumerateThreads_ = false; - return *this; - } - - TSelf& SetThreadNamePrefix(const TString& prefix) { - ThreadName_ = prefix; - EnumerateThreads_ = true; - return *this; - } -}; - +struct TThreadPoolParams { + bool Catching_ = true; + bool Blocking_ = false; + IThreadFactory* Factory_ = SystemThreadFactory(); + TString ThreadName_; + bool EnumerateThreads_ = false; + + using TSelf = TThreadPoolParams; + + TThreadPoolParams() { + } + + TThreadPoolParams(IThreadFactory* factory) + : Factory_(factory) + { + } + + TThreadPoolParams(const TString& name) { + SetThreadName(name); + } + + TThreadPoolParams(const char* name) { + SetThreadName(name); + } + + TSelf& SetCatching(bool val) { + Catching_ = val; + return *this; + } + + TSelf& SetBlocking(bool val) { + Blocking_ = val; + return *this; + } + + TSelf& SetFactory(IThreadFactory* factory) { + Factory_ = factory; + return *this; + } + + TSelf& SetThreadName(const TString& name) { + ThreadName_ = name; + EnumerateThreads_ = false; + return *this; + } + + TSelf& SetThreadNamePrefix(const TString& prefix) { + ThreadName_ = prefix; + EnumerateThreads_ = true; + return *this; + } +}; + /** * A queue processed simultaneously by several threads */ class IThreadPool: public IThreadFactory, public TNonCopyable { public: - using TParams = TThreadPoolParams; - + using TParams = TThreadPoolParams; + ~IThreadPool() override = default; /** @@ -255,18 +255,18 @@ public: } }; -class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { -public: - TThreadPoolBase(const TParams& params); - -protected: - TParams Params; -}; - +class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder { +public: + TThreadPoolBase(const TParams& params); + +protected: + TParams Params; +}; + /** queue processed by fixed size thread pool */ -class TThreadPool: public TThreadPoolBase { +class TThreadPool: public TThreadPoolBase { public: - TThreadPool(const TParams& params = {}); + TThreadPool(const TParams& params = {}); ~TThreadPool() override; bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; @@ -290,9 +290,9 @@ private: * Always create new thread for new task, when all existing threads are busy. * Maybe dangerous, number of threads is not limited. */ -class TAdaptiveThreadPool: public TThreadPoolBase { +class TAdaptiveThreadPool: public TThreadPoolBase { public: - TAdaptiveThreadPool(const TParams& params = {}); + TAdaptiveThreadPool(const TParams& params = {}); ~TAdaptiveThreadPool() override; /** @@ -308,15 +308,15 @@ public: void Stop() noexcept override; size_t Size() const noexcept override; -private: +private: class TImpl; THolder<TImpl> Impl_; }; /** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */ -class TSimpleThreadPool: public TThreadPoolBase { +class TSimpleThreadPool: public TThreadPoolBase { public: - TSimpleThreadPool(const TParams& params = {}); + TSimpleThreadPool(const TParams& params = {}); ~TSimpleThreadPool() override; bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT; @@ -387,4 +387,4 @@ inline void Delete(THolder<IThreadPool> q) { * Creates and starts TThreadPool if threadsCount > 1, or TFakeThreadPool otherwise * You could specify blocking and catching modes for TThreadPool only */ -THolder<IThreadPool> CreateThreadPool(size_t threadCount, size_t queueSizeLimit = 0, const IThreadPool::TParams& params = {}); +THolder<IThreadPool> CreateThreadPool(size_t threadCount, size_t queueSizeLimit = 0, const IThreadPool::TParams& params = {}); diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp index 893770d0c4..189e4865e1 100644 --- a/util/thread/pool_ut.cpp +++ b/util/thread/pool_ut.cpp @@ -5,9 +5,9 @@ #include <util/stream/output.h> #include <util/random/fast.h> #include <util/system/spinlock.h> -#include <util/system/thread.h> -#include <util/system/mutex.h> -#include <util/system/condvar.h> +#include <util/system/thread.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> struct TThreadPoolTest { TSpinLock Lock; @@ -99,7 +99,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { Y_UNIT_TEST(TestTThreadPoolBlocking) { TThreadPoolTest t; - TThreadPool q(TThreadPool::TParams().SetBlocking(true)); + TThreadPool q(TThreadPool::TParams().SetBlocking(true)); t.TestAnyQueue(&q, 100); } @@ -132,32 +132,32 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { ); UNIT_ASSERT_VALUES_EQUAL(added, false); } - + Y_UNIT_TEST(TestSafeAddFuncThrows) { TFailAddQueue queue; UNIT_CHECK_GENERATED_EXCEPTION(queue.SafeAddFunc([] {}), TThreadPoolException); } Y_UNIT_TEST(TestFunctionNotCopied) { - struct TFailOnCopy { - TFailOnCopy() { - } - - TFailOnCopy(TFailOnCopy&&) { - } - - TFailOnCopy(const TFailOnCopy&) { + struct TFailOnCopy { + TFailOnCopy() { + } + + TFailOnCopy(TFailOnCopy&&) { + } + + TFailOnCopy(const TFailOnCopy&) { UNIT_FAIL("Don't copy std::function inside TThreadPool"); - } - }; - - TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true)); - queue.Start(2); - + } + }; + + TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true)); + queue.Start(2); + queue.SafeAddFunc([data = TFailOnCopy()]() {}); - - queue.Stop(); - } + + queue.Stop(); + } Y_UNIT_TEST(TestInfoGetters) { TThreadPool queue; @@ -178,80 +178,80 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { queue.Stop(); } - + void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) { - pool.Start(1); - TString name; - pool.SafeAddFunc([&name]() { - name = TThread::CurrentThreadName(); - }); - pool.Stop(); + pool.Start(1); + TString name; + pool.SafeAddFunc([&name]() { + name = TThread::CurrentThreadName(); + }); + pool.Stop(); if (TThread::CanGetCurrentThreadName()) { UNIT_ASSERT_EQUAL(name, expectedName); UNIT_ASSERT_UNEQUAL(TThread::CurrentThreadName(), expectedName); } - } - - Y_UNIT_TEST(TestFixedThreadName) { - const TString expectedName = "HelloWorld"; - { - TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadName(expectedName)); - TestFixedThreadName(pool, expectedName); - } - { - TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadName(expectedName)); - TestFixedThreadName(pool, expectedName); - } - } - + } + + Y_UNIT_TEST(TestFixedThreadName) { + const TString expectedName = "HelloWorld"; + { + TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadName(expectedName)); + TestFixedThreadName(pool, expectedName); + } + { + TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadName(expectedName)); + TestFixedThreadName(pool, expectedName); + } + } + void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) { - pool.Start(expectedNames.size()); - TMutex lock; - TCondVar allReady; - size_t readyCount = 0; - THashSet<TString> names; - for (size_t i = 0; i < expectedNames.size(); ++i) { - pool.SafeAddFunc([&]() { + pool.Start(expectedNames.size()); + TMutex lock; + TCondVar allReady; + size_t readyCount = 0; + THashSet<TString> names; + for (size_t i = 0; i < expectedNames.size(); ++i) { + pool.SafeAddFunc([&]() { with_lock (lock) { - if (++readyCount == expectedNames.size()) { - allReady.BroadCast(); - } else { - while (readyCount != expectedNames.size()) { - allReady.WaitI(lock); - } - } - names.insert(TThread::CurrentThreadName()); - } - }); - } - pool.Stop(); + if (++readyCount == expectedNames.size()) { + allReady.BroadCast(); + } else { + while (readyCount != expectedNames.size()) { + allReady.WaitI(lock); + } + } + names.insert(TThread::CurrentThreadName()); + } + }); + } + pool.Stop(); if (TThread::CanGetCurrentThreadName()) { UNIT_ASSERT_EQUAL(names, expectedNames); } - } - - Y_UNIT_TEST(TestEnumeratedThreadName) { - const TString namePrefix = "HelloWorld"; - const THashSet<TString> expectedNames = { - "HelloWorld0", - "HelloWorld1", - "HelloWorld2", - "HelloWorld3", - "HelloWorld4", - "HelloWorld5", - "HelloWorld6", - "HelloWorld7", - "HelloWorld8", - "HelloWorld9", - "HelloWorld10", - }; - { - TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadNamePrefix(namePrefix)); - TestEnumeratedThreadName(pool, expectedNames); - } - { - TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadNamePrefix(namePrefix)); - TestEnumeratedThreadName(pool, expectedNames); - } - } + } + + Y_UNIT_TEST(TestEnumeratedThreadName) { + const TString namePrefix = "HelloWorld"; + const THashSet<TString> expectedNames = { + "HelloWorld0", + "HelloWorld1", + "HelloWorld2", + "HelloWorld3", + "HelloWorld4", + "HelloWorld5", + "HelloWorld6", + "HelloWorld7", + "HelloWorld8", + "HelloWorld9", + "HelloWorld10", + }; + { + TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadNamePrefix(namePrefix)); + TestEnumeratedThreadName(pool, expectedNames); + } + { + TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadNamePrefix(namePrefix)); + TestEnumeratedThreadName(pool, expectedNames); + } + } } diff --git a/util/ysaveload_ut.cpp b/util/ysaveload_ut.cpp index 723c68f391..f1a1c27ae5 100644 --- a/util/ysaveload_ut.cpp +++ b/util/ysaveload_ut.cpp @@ -14,7 +14,7 @@ #include <util/generic/vector.h> #include <util/generic/buffer.h> #include <util/generic/hash_set.h> -#include <util/generic/maybe.h> +#include <util/generic/maybe.h> #include <util/generic/variant.h> static inline char* AllocateFromPool(TMemoryPool& pool, size_t len) { @@ -190,26 +190,26 @@ private: Save(&S_, deq); } - { - TMaybe<size_t> h(10); - Save(&S_, h); - } - - { - TMaybe<size_t> h(20); - Save(&S_, h); - } - - { - TMaybe<size_t> h; - Save(&S_, h); - } - - { - TMaybe<size_t> h; - Save(&S_, h); - } - + { + TMaybe<size_t> h(10); + Save(&S_, h); + } + + { + TMaybe<size_t> h(20); + Save(&S_, h); + } + + { + TMaybe<size_t> h; + Save(&S_, h); + } + + { + TMaybe<size_t> h; + Save(&S_, h); + } + { THashMultiMap<TString, int> mm; @@ -333,32 +333,32 @@ private: UNIT_ASSERT_EQUAL(deq[2], 4); UNIT_ASSERT_EQUAL(deq[3], 5); } - - { - TMaybe<size_t> h(5); - Load(&S_, h); - UNIT_ASSERT_EQUAL(*h, 10); - } - - { - TMaybe<size_t> h; - Load(&S_, h); - UNIT_ASSERT_EQUAL(*h, 20); - } - - { - TMaybe<size_t> h; - UNIT_ASSERT(!h); - Load(&S_, h); - UNIT_ASSERT(!h); - } - - { - TMaybe<size_t> h(7); - UNIT_ASSERT(!!h); - Load(&S_, h); - UNIT_ASSERT(!h); - } + + { + TMaybe<size_t> h(5); + Load(&S_, h); + UNIT_ASSERT_EQUAL(*h, 10); + } + + { + TMaybe<size_t> h; + Load(&S_, h); + UNIT_ASSERT_EQUAL(*h, 20); + } + + { + TMaybe<size_t> h; + UNIT_ASSERT(!h); + Load(&S_, h); + UNIT_ASSERT(!h); + } + + { + TMaybe<size_t> h(7); + UNIT_ASSERT(!!h); + Load(&S_, h); + UNIT_ASSERT(!h); + } { THashMultiMap<TString, int> mm; |