diff options
author | kulikov <kulikov@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
commit | 65e5266709e7ff94b14ae128309e229de714b0df (patch) | |
tree | d4901f06e56d95f5e5d36bd1806bcc144d03bf41 /library/cpp | |
parent | 0041d99876ae3dccc3f0fa8787131d85ddfd486b (diff) | |
download | ydb-65e5266709e7ff94b14ae128309e229de714b0df.tar.gz |
Restoring authorship annotation for <kulikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp')
27 files changed, 1002 insertions, 1002 deletions
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h index b638b2df1a..d158bbe9f5 100644 --- a/library/cpp/coroutine/engine/cont_poller.h +++ b/library/cpp/coroutine/engine/cont_poller.h @@ -143,13 +143,13 @@ namespace NCoro { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); lst->PushFront(event); - ui16 newFlags = Flags(*lst); + ui16 newFlags = Flags(*lst); if (newFlags != oldFlags) { - if (oldFlags) { - newFlags |= CONT_POLL_MODIFY; - } - + if (oldFlags) { + newFlags |= CONT_POLL_MODIFY; + } + P_->Set(lst, event->Fd(), newFlags); } } @@ -158,13 +158,13 @@ namespace NCoro { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); event->Unlink(); - ui16 newFlags = Flags(*lst); + ui16 newFlags = Flags(*lst); if (newFlags != oldFlags) { - if (newFlags) { - newFlags |= CONT_POLL_MODIFY; - } - + if (newFlags) { + newFlags |= CONT_POLL_MODIFY; + } + P_->Set(lst, event->Fd(), newFlags); } } @@ -174,9 +174,9 @@ namespace NCoro { P_->Wait(events, deadLine); } - EContPoller PollEngine() const { - return P_->PollEngine(); - } + EContPoller PollEngine() const { + return P_->PollEngine(); + } private: static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept { ui16 ret = 0; diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp index 8b372496a2..a7012eb8e2 100644 --- a/library/cpp/coroutine/engine/coroutine_ut.cpp +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -8,16 +8,16 @@ #include <util/system/pipe.h> #include <util/system/env.h> #include <util/system/info.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/generic/xrange.h> -#include <util/generic/serialized_enum.h> +#include <util/generic/serialized_enum.h> // TODO (velavokr): BALANCER-1345 add more tests on pollers class TCoroTest: public TTestBase { UNIT_TEST_SUITE(TCoroTest); UNIT_TEST(TestSimpleX1); - UNIT_TEST(TestSimpleX1MultiThread); + UNIT_TEST(TestSimpleX1MultiThread); UNIT_TEST(TestSimpleX2); UNIT_TEST(TestSimpleX3); UNIT_TEST(TestMemFun); @@ -40,10 +40,10 @@ class TCoroTest: public TTestBase { UNIT_TEST(TestLegacyCancelYieldRaceBug) UNIT_TEST(TestJoinRescheduleBug); UNIT_TEST(TestEventQueue) - UNIT_TEST(TestNestedExecutor) - UNIT_TEST(TestComputeCoroutineYield) - UNIT_TEST(TestPollEngines); - UNIT_TEST(TestUserEvent); + UNIT_TEST(TestNestedExecutor) + UNIT_TEST(TestComputeCoroutineYield) + UNIT_TEST(TestPollEngines); + UNIT_TEST(TestUserEvent); UNIT_TEST(TestPause); UNIT_TEST(TestOverrideTime); UNIT_TEST_SUITE_END(); @@ -51,7 +51,7 @@ class TCoroTest: public TTestBase { public: void TestException(); void TestSimpleX1(); - void TestSimpleX1MultiThread(); + void TestSimpleX1MultiThread(); void TestSimpleX2(); void TestSimpleX3(); void TestMemFun(); @@ -72,10 +72,10 @@ public: void TestLegacyCancelYieldRaceBug(); void TestJoinRescheduleBug(); void TestEventQueue(); - void TestNestedExecutor(); - void TestComputeCoroutineYield(); - void TestPollEngines(); - void TestUserEvent(); + void TestNestedExecutor(); + void TestComputeCoroutineYield(); + void TestPollEngines(); + void TestUserEvent(); void TestPause(); void TestOverrideTime(); }; @@ -130,54 +130,54 @@ static int i0; static void CoRun(TCont* c, void* /*run*/) { while (i0 < 100000) { ++i0; - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); c->Yield(); - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); } } static void CoMain(TCont* c, void* /*arg*/) { for (volatile size_t i2 = 0; i2 < 10; ++i2) { - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); c->Executor()->Create(CoRun, nullptr, "run"); - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); } } void TCoroTest::TestSimpleX1() { i0 = 0; TContExecutor e(32000); - - UNIT_ASSERT(RunningCont() == nullptr); - + + UNIT_ASSERT(RunningCont() == nullptr); + e.Execute(CoMain); UNIT_ASSERT_VALUES_EQUAL(i0, 100000); - - UNIT_ASSERT(RunningCont() == nullptr); -} - -void TCoroTest::TestSimpleX1MultiThread() { - TVector<THolder<TThread>> threads; - const size_t nThreads = 0; - TAtomic c = 0; - for (size_t i = 0; i < nThreads; ++i) { - threads.push_back(MakeHolder<TThread>([&]() { - TestSimpleX1(); - AtomicIncrement(c); - })); - } - - for (auto& t : threads) { - t->Start(); - } - - for (auto& t: threads) { - t->Join(); - } - - UNIT_ASSERT_EQUAL(c, nThreads); -} - + + UNIT_ASSERT(RunningCont() == nullptr); +} + +void TCoroTest::TestSimpleX1MultiThread() { + TVector<THolder<TThread>> threads; + const size_t nThreads = 0; + TAtomic c = 0; + for (size_t i = 0; i < nThreads; ++i) { + threads.push_back(MakeHolder<TThread>([&]() { + TestSimpleX1(); + AtomicIncrement(c); + })); + } + + for (auto& t : threads) { + t->Start(); + } + + for (auto& t: threads) { + t->Join(); + } + + UNIT_ASSERT_EQUAL(c, nThreads); +} + struct TTestObject { int i = 0; int j = 0; @@ -863,85 +863,85 @@ void TCoroTest::TestEventQueue() { }, &queue); } -void TCoroTest::TestNestedExecutor() { -#ifndef _win_ - //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr - TContExecutor exec(32000); - UNIT_ASSERT(!RunningCont()); - - exec.Execute([](TCont* cont, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); - - TContExecutor exec2(32000); - exec2.Execute([](TCont* cont2, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); - TContExecutor exec3(32000); - exec3.Execute([](TCont* cont3, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3); - }); - - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); - }); - - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); - }); - - UNIT_ASSERT(!RunningCont()); -#endif -} - -void TCoroTest::TestComputeCoroutineYield() { -//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow - TContExecutor exec(32000); - exec.SetFailOnError(true); - - TPipe in, out; - TPipe::Pipe(in, out); - SetNonBlock(in.GetHandle()); - size_t lastRead = 42; - - auto compute = [&](TCont* cont) { - for (size_t i = 0; i < 10; ++i) { - write(out.GetHandle(), &i, sizeof i); - Sleep(TDuration::MilliSeconds(10)); - cont->Yield(); - UNIT_ASSERT(lastRead == i); - } - }; - - auto io = [&](TCont* cont) { - for (size_t i = 0; i < 10; ++i) { - NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead); - } - }; - - exec.Create(compute, "compute"); - exec.Create(io, "io"); - - exec.Execute(); -} - -void TCoroTest::TestPollEngines() { - bool defaultChecked = false; - for (auto engine : GetEnumAllValues<EContPoller>()) { - auto poller = IPollerFace::Construct(engine); - if (!poller) { - continue; - } - - TContExecutor exec(32000, IPollerFace::Construct(engine)); - - if (engine == EContPoller::Default) { - defaultChecked = true; - UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined); - } else { - UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine); - } - } - - UNIT_ASSERT(defaultChecked); -} - +void TCoroTest::TestNestedExecutor() { +#ifndef _win_ + //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr + TContExecutor exec(32000); + UNIT_ASSERT(!RunningCont()); + + exec.Execute([](TCont* cont, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + + TContExecutor exec2(32000); + exec2.Execute([](TCont* cont2, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + TContExecutor exec3(32000); + exec3.Execute([](TCont* cont3, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + }); + + UNIT_ASSERT(!RunningCont()); +#endif +} + +void TCoroTest::TestComputeCoroutineYield() { +//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow + TContExecutor exec(32000); + exec.SetFailOnError(true); + + TPipe in, out; + TPipe::Pipe(in, out); + SetNonBlock(in.GetHandle()); + size_t lastRead = 42; + + auto compute = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + write(out.GetHandle(), &i, sizeof i); + Sleep(TDuration::MilliSeconds(10)); + cont->Yield(); + UNIT_ASSERT(lastRead == i); + } + }; + + auto io = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead); + } + }; + + exec.Create(compute, "compute"); + exec.Create(io, "io"); + + exec.Execute(); +} + +void TCoroTest::TestPollEngines() { + bool defaultChecked = false; + for (auto engine : GetEnumAllValues<EContPoller>()) { + auto poller = IPollerFace::Construct(engine); + if (!poller) { + continue; + } + + TContExecutor exec(32000, IPollerFace::Construct(engine)); + + if (engine == EContPoller::Default) { + defaultChecked = true; + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined); + } else { + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine); + } + } + + UNIT_ASSERT(defaultChecked); +} + void TCoroTest::TestPause() { TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()}; @@ -959,28 +959,28 @@ void TCoroTest::TestPause() { UNIT_ASSERT_EQUAL(i, 2); } -void TCoroTest::TestUserEvent() { - TContExecutor exec(32000); - - struct TUserEvent : public IUserEvent { - bool Called = false; - void Execute() override { - Called = true; - } - } event; - - auto f = [&](TCont* cont) { - UNIT_ASSERT(!event.Called); - exec.ScheduleUserEvent(&event); - UNIT_ASSERT(!event.Called); - cont->Yield(); - UNIT_ASSERT(event.Called); - }; - - exec.Execute(f); - - UNIT_ASSERT(event.Called); -} +void TCoroTest::TestUserEvent() { + TContExecutor exec(32000); + + struct TUserEvent : public IUserEvent { + bool Called = false; + void Execute() override { + Called = true; + } + } event; + + auto f = [&](TCont* cont) { + UNIT_ASSERT(!event.Called); + exec.ScheduleUserEvent(&event); + UNIT_ASSERT(!event.Called); + cont->Yield(); + UNIT_ASSERT(event.Called); + }; + + exec.Execute(f); + + UNIT_ASSERT(event.Called); +} void TCoroTest::TestOverrideTime() { class TTime: public NCoro::ITime { diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 7ae6f74051..9231d2b1ba 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -3,8 +3,8 @@ #include "stack/stack_allocator.h" #include "stack/stack_guards.h" -#include <util/generic/scope.h> -#include <util/thread/singleton.h> +#include <util/generic/scope.h> +#include <util/thread/singleton.h> #include <util/stream/format.h> #include <util/stream/output.h> #include <util/system/yassert.h> @@ -157,47 +157,47 @@ void TContExecutor::WaitForIO() { // Waking a coroutine puts it into ReadyNext_ list const auto next = WaitQueue_.WakeTimedout(now); - if (!UserEvents_.Empty()) { - TIntrusiveList<IUserEvent> userEvents; - userEvents.Swap(UserEvents_); - do { - userEvents.PopFront()->Execute(); - } while (!userEvents.Empty()); - } - + if (!UserEvents_.Empty()) { + TIntrusiveList<IUserEvent> userEvents; + userEvents.Swap(UserEvents_); + do { + userEvents.PopFront()->Execute(); + } while (!userEvents.Empty()); + } + // Polling will return as soon as there is an event to process or a timeout. // If there are woken coroutines we do not want to sleep in the poller // yet still we want to check for new io // to prevent ourselves from locking out of io by constantly waking coroutines. - if (ReadyNext_.Empty()) { + if (ReadyNext_.Empty()) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); } - Poll(next); + Poll(next); if (EnterPollerCallback_) { EnterPollerCallback_->OnExitPoller(); } - } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { + } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); } - Poll(now); + Poll(now); if (EnterPollerCallback_) { EnterPollerCallback_->OnExitPoller(); } - } + } Ready_.Append(ReadyNext_); } } -void TContExecutor::Poll(TInstant deadline) { - Poller_.Wait(PollerEvents_, deadline); - LastPoll_ = Now(); - - // Waking a coroutine puts it into ReadyNext_ list - for (auto event : PollerEvents_) { +void TContExecutor::Poll(TInstant deadline) { + Poller_.Wait(PollerEvents_, deadline); + LastPoll_ = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + for (auto event : PollerEvents_) { auto* lst = (NCoro::TPollEventList*)event.Data; const int status = event.Status; @@ -276,36 +276,36 @@ void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { Ready_.PushBack(cont); } -namespace { +namespace { inline TContExecutor*& ThisThreadExecutor() { struct TThisThreadExecutorHolder { - TContExecutor* Executor = nullptr; + TContExecutor* Executor = nullptr; }; return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor; } -} - +} + void TContExecutor::DeleteScheduled() noexcept { ToDelete_.ForEach([this](TCont* c) { Release(c); }); } -TCont* RunningCont() { +TCont* RunningCont() { TContExecutor* thisThreadExecutor = ThisThreadExecutor(); - return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; -} - + return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; +} + void TContExecutor::RunScheduler() noexcept { try { - TContExecutor* const prev = ThisThreadExecutor(); + TContExecutor* const prev = ThisThreadExecutor(); ThisThreadExecutor() = this; TCont* caller = Current_; TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; - Y_DEFER { - ThisThreadExecutor() = prev; - }; - + Y_DEFER { + ThisThreadExecutor() = prev; + }; + while (true) { if (ScheduleCallback_ && Current_) { ScheduleCallback_->OnUnschedule(*this); @@ -345,7 +345,7 @@ void TContExecutor::RunScheduler() noexcept { } } } catch (...) { - TBackTrace::FromCurrentException().PrintTo(Cerr); + TBackTrace::FromCurrentException().PrintTo(Cerr); Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); } } diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h index 283a96ecf1..a055012f3f 100644 --- a/library/cpp/coroutine/engine/impl.h +++ b/library/cpp/coroutine/engine/impl.h @@ -121,9 +121,9 @@ private: bool Scheduled_ = false; }; -TCont* RunningCont(); - +TCont* RunningCont(); + template <class Functor> static void ContHelperFunc(TCont* cont, void* arg) { (*((Functor*)(arg)))(cont); @@ -134,15 +134,15 @@ static void ContHelperMemberFunc(TCont* c, void* arg) { ((reinterpret_cast<T*>(arg))->*M)(c); } -class IUserEvent - : public TIntrusiveListItem<IUserEvent> -{ -public: - virtual ~IUserEvent() = default; - - virtual void Execute() = 0; -}; - +class IUserEvent + : public TIntrusiveListItem<IUserEvent> +{ +public: + virtual ~IUserEvent() = default; + + virtual void Execute() = 0; +}; + /// Central coroutine class. /// Note, coroutines are single-threaded, and all methods must be called from the single thread class TContExecutor { @@ -249,22 +249,22 @@ public: return FailOnError_; } - void RegisterInWaitQueue(NCoro::TContPollEvent* event) { - WaitQueue_.Register(event); - } - + void RegisterInWaitQueue(NCoro::TContPollEvent* event) { + WaitQueue_.Register(event); + } + void ScheduleIoWait(TFdEvent* event) { - RegisterInWaitQueue(event); + RegisterInWaitQueue(event); Poller_.Schedule(event); } void ScheduleIoWait(TTimerEvent* event) noexcept { - RegisterInWaitQueue(event); + RegisterInWaitQueue(event); } - void ScheduleUserEvent(IUserEvent* event) { - UserEvents_.PushBack(event); - } + void ScheduleUserEvent(IUserEvent* event) { + UserEvents_.PushBack(event); + } void Pause(); TInstant Now(); @@ -285,7 +285,7 @@ private: void WaitForIO(); - void Poll(TInstant deadline); + void Poll(TInstant deadline); private: NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr; @@ -300,11 +300,11 @@ private: TContList ReadyNext_; NCoro::TEventWaitQueue WaitQueue_; NCoro::TContPoller Poller_; - NCoro::TContPoller::TEvents PollerEvents_; - TInstant LastPoll_; - - TIntrusiveList<IUserEvent> UserEvents_; + NCoro::TContPoller::TEvents PollerEvents_; + TInstant LastPoll_; + TIntrusiveList<IUserEvent> UserEvents_; + size_t Allocated_ = 0; TCont* Current_ = nullptr; bool FailOnError_ = false; diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp index 61164fa56b..722ef1e3a6 100644 --- a/library/cpp/coroutine/engine/poller.cpp +++ b/library/cpp/coroutine/engine/poller.cpp @@ -46,11 +46,11 @@ namespace { template <class T> class TVirtualize: public IPollerFace { public: - TVirtualize(EContPoller pollerEngine) - : PollerEngine_(pollerEngine) - { - } - + TVirtualize(EContPoller pollerEngine) + : PollerEngine_(pollerEngine) + { + } + void Set(const TChange& c) override { P_.Set(c); } @@ -59,12 +59,12 @@ namespace { P_.Wait(events, deadLine); } - EContPoller PollEngine() const override { - return PollerEngine_; - } + EContPoller PollEngine() const override { + return PollerEngine_; + } private: T P_; - const EContPoller PollerEngine_; + const EContPoller PollerEngine_; }; @@ -366,21 +366,21 @@ THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { switch (poller) { case EContPoller::Default: - case EContPoller::Combined: - return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); + case EContPoller::Combined: + return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); case EContPoller::Select: - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); case EContPoller::Poll: - return MakeHolder<TVirtualize<TPollPoller>>(poller); + return MakeHolder<TVirtualize<TPollPoller>>(poller); case EContPoller::Epoll: #if defined(HAVE_EPOLL_POLLER) - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); #else return nullptr; #endif case EContPoller::Kqueue: #if defined(HAVE_KQUEUE_POLLER) - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); #else return nullptr; #endif diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h index 8ea012c0fc..ff60266d73 100644 --- a/library/cpp/coroutine/engine/poller.h +++ b/library/cpp/coroutine/engine/poller.h @@ -8,7 +8,7 @@ enum class EContPoller { Default /* "default" */, - Combined /* "combined" */, + Combined /* "combined" */, Select /* "select" */, Poll /* "poll" */, Epoll /* "epoll" */, @@ -42,7 +42,7 @@ public: virtual void Set(const TChange& change) = 0; virtual void Wait(TEvents& events, TInstant deadLine) = 0; - virtual EContPoller PollEngine() const = 0; + virtual EContPoller PollEngine() const = 0; static THolder<IPollerFace> Default(); static THolder<IPollerFace> Construct(TStringBuf name); diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index 1ebb7e7b38..2afd5c709a 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -93,7 +93,7 @@ public: : Impl_(nullptr) { } - + TPooledSocket(TImpl* impl) : Impl_(impl) { @@ -178,7 +178,7 @@ public: } else { ret = AllocateMore(conn); } - + ret.Impl_->Touch(); return ret; @@ -189,10 +189,10 @@ public: alive->Touch(); socket = TPooledSocket(alive); return true; - } + } return false; } - + private: TPooledSocket::TImpl* GetImpl() { TGuard<TMutex> guard(Mutex_); diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp index 10ea69ddc3..6857670e1e 100644 --- a/library/cpp/coroutine/engine/trampoline.cpp +++ b/library/cpp/coroutine/engine/trampoline.cpp @@ -38,9 +38,9 @@ TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, return Stack_.Get(); } - const char* TTrampoline::ContName() const noexcept { - return Cont_->Name(); - } + const char* TTrampoline::ContName() const noexcept { + return Cont_->Name(); + } void TTrampoline::DoRunNaked() { DoRun(); diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h index 37b61cf015..30cc079ab0 100644 --- a/library/cpp/coroutine/engine/trampoline.h +++ b/library/cpp/coroutine/engine/trampoline.h @@ -39,8 +39,8 @@ namespace NCoro { } void SwitchTo(TExceptionSafeContext* ctx) noexcept { - Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); - Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); + Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); + Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); Ctx_.SwitchTo(ctx); } @@ -49,8 +49,8 @@ namespace NCoro { void DoRunNaked() override; private: - const char* ContName() const noexcept; - private: + const char* ContName() const noexcept; + private: NStack::TStackHolder Stack_; const TContClosure Clo_; TExceptionSafeContext Ctx_; diff --git a/library/cpp/http/io/compression.h b/library/cpp/http/io/compression.h index f16c4a18eb..30eccdaca5 100644 --- a/library/cpp/http/io/compression.h +++ b/library/cpp/http/io/compression.h @@ -48,25 +48,25 @@ private: THashMap<TStringBuf, TCodec> Codecs_; TVector<TStringBuf> BestCodecs_; }; - -namespace NHttp { - template <typename F> - TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) { - if (available.empty()) { - return "identity"; - } - - if (accepted("*")) { - return TString(available[0]); - } - - for (const auto& coding : available) { - TString s(coding); - if (accepted(s)) { - return s; - } - } - - return "identity"; - } -} + +namespace NHttp { + template <typename F> + TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) { + if (available.empty()) { + return "identity"; + } + + if (accepted("*")) { + return TString(available[0]); + } + + for (const auto& coding : available) { + TString s(coding); + if (accepted(s)) { + return s; + } + } + + return "identity"; + } +} diff --git a/library/cpp/http/io/compression_ut.cpp b/library/cpp/http/io/compression_ut.cpp index 2f3d131f8c..d8a2d11a08 100644 --- a/library/cpp/http/io/compression_ut.cpp +++ b/library/cpp/http/io/compression_ut.cpp @@ -5,7 +5,7 @@ #include <library/cpp/testing/unittest/tests_data.h> #include <util/stream/zlib.h> -#include <util/generic/hash_set.h> +#include <util/generic/hash_set.h> Y_UNIT_TEST_SUITE(THttpCompressionTest) { static const TString DATA = "I'm a teapot"; @@ -43,18 +43,18 @@ Y_UNIT_TEST_SUITE(THttpCompressionTest) { auto decodedStream = (*decoder)(&buffer); UNIT_ASSERT_EQUAL(decodedStream->ReadAll(), DATA); } - - Y_UNIT_TEST(TestChooseBestCompressionScheme) { - THashSet<TString> accepted; - - auto checkAccepted = [&accepted](const TString& v) { - return accepted.contains(v); - }; - - UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - accepted.insert("deflate"); - UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - accepted.insert("*"); - UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - } + + Y_UNIT_TEST(TestChooseBestCompressionScheme) { + THashSet<TString> accepted; + + auto checkAccepted = [&accepted](const TString& v) { + return accepted.contains(v); + }; + + UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("deflate"); + UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("*"); + UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + } } // THttpCompressionTest suite diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp index 6689be684f..728d1a89c1 100644 --- a/library/cpp/http/io/stream.cpp +++ b/library/cpp/http/io/stream.cpp @@ -286,7 +286,7 @@ private: TParsedHeaders p; size_t pos = FirstLine_.rfind(' '); - // In HTTP/1.1 Keep-Alive is turned on by default + // In HTTP/1.1 Keep-Alive is turned on by default if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) { p.KeepAlive = true; //request } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) { @@ -428,12 +428,12 @@ bool THttpInput::AcceptEncoding(const TString& coding) const { } TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const { - return NHttp::ChooseBestCompressionScheme( - [this](const TString& coding) { - return AcceptEncoding(coding); - }, - codings - ); + return NHttp::ChooseBestCompressionScheme( + [this](const TString& coding) { + return AcceptEncoding(coding); + }, + codings + ); } TString THttpInput::BestCompressionScheme() const { diff --git a/library/cpp/http/io/stream_ut.cpp b/library/cpp/http/io/stream_ut.cpp index 1ea35df675..1d78c82e0e 100644 --- a/library/cpp/http/io/stream_ut.cpp +++ b/library/cpp/http/io/stream_ut.cpp @@ -179,63 +179,63 @@ Y_UNIT_TEST_SUITE(THttpStreamTest) { } Y_UNIT_TEST(TestKeepAlive) { - { + { TString s = "GET / HTTP/1.0\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.1\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "HTTP/1.0 200 Ok\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "HTTP/1.0 200 Ok\r\nConnection: keep-alive\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "HTTP/1.1 200 Ok\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "HTTP/1.1 200 Ok\r\nConnection: close\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - } - + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + } + Y_UNIT_TEST(TestMinRequest) { TString res = "qqqqqq"; TPortManager pm; diff --git a/library/cpp/http/server/conn.cpp b/library/cpp/http/server/conn.cpp index 38a76c4c30..37f82f9f7b 100644 --- a/library/cpp/http/server/conn.cpp +++ b/library/cpp/http/server/conn.cpp @@ -5,11 +5,11 @@ class THttpServerConn::TImpl { public: - inline TImpl(const TSocket& s, size_t outputBufferSize) + inline TImpl(const TSocket& s, size_t outputBufferSize) : S_(s) , SI_(S_) , SO_(S_) - , BO_(&SO_, outputBufferSize) + , BO_(&SO_, outputBufferSize) , HI_(&SI_) , HO_(&BO_, &HI_) { @@ -44,15 +44,15 @@ private: }; THttpServerConn::THttpServerConn(const TSocket& s) - : THttpServerConn(s, s.MaximumTransferUnit()) -{ -} - -THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize) - : Impl_(new TImpl(s, outputBufferSize)) + : THttpServerConn(s, s.MaximumTransferUnit()) { } +THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize) + : Impl_(new TImpl(s, outputBufferSize)) +{ +} + THttpServerConn::~THttpServerConn() { } diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 128583bdd7..3d21108b02 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,5 +1,5 @@ #include "http.h" -#include "http_ex.h" +#include "http_ex.h" #include <library/cpp/threading/equeue/equeue.h> @@ -243,17 +243,17 @@ public: } void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { - struct TFailRequest: public THttpClientRequestEx { + struct TFailRequest: public THttpClientRequestEx { inline TFailRequest(TAutoPtr<TClientRequest> parent) { Conn_.Reset(parent->Conn_.Release()); HttpConn_.Reset(parent->HttpConn_.Release()); } bool Reply(void*) override { - if (!ProcessHeaders()) { + if (!ProcessHeaders()) { return true; - } - + } + ProcessFailRequest(0); return true; } @@ -558,11 +558,11 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, { SetNoDelay(Socket_, true); - const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; - if (clientTimeout != TDuration::Zero()) { + const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; + if (clientTimeout != TDuration::Zero()) { SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond()); - } - + } + HttpServ_->IncreaseConnections(); } @@ -679,16 +679,16 @@ void TClientRequest::ResetConnection() { void TClientRequest::Process(void* ThreadSpecificResource) { THolder<TClientRequest> this_(this); - auto* serverImpl = Conn_->HttpServ_; - + auto* serverImpl = Conn_->HttpServ_; + try { if (!HttpConn_) { - const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; - if (outputBufferSize) { - HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); - } else { - HttpConn_.Reset(new THttpServerConn(Socket())); - } + const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; + if (outputBufferSize) { + HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); + } else { + HttpConn_.Reset(new THttpServerConn(Socket())); + } auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection; HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection)); @@ -715,7 +715,7 @@ void TClientRequest::Process(void* ThreadSpecificResource) { return; } } catch (...) { - serverImpl->Cb_->OnException(); + serverImpl->Cb_->OnException(); throw; } diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index cc62bb988e..42580906f6 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -456,31 +456,31 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { server.Stop(); } - class TReleaseConnectionServer: public THttpServer::ICallBack { - class TRequest: public THttpClientRequestEx { - public: - bool Reply(void* /*tsr*/) override { - Output() << "HTTP/1.1 200 Ok\r\n\r\n"; - Output() << "reply"; - Output().Finish(); - - ReleaseConnection(); - - throw yexception() << "some error"; - - return true; - } - }; - - public: - TClientRequest* CreateClient() override { - return new TRequest(); - } - - void OnException() override { - ExceptionMessage = CurrentExceptionMessage(); - } - + class TReleaseConnectionServer: public THttpServer::ICallBack { + class TRequest: public THttpClientRequestEx { + public: + bool Reply(void* /*tsr*/) override { + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + Output() << "reply"; + Output().Finish(); + + ReleaseConnection(); + + throw yexception() << "some error"; + + return true; + } + }; + + public: + TClientRequest* CreateClient() override { + return new TRequest(); + } + + void OnException() override { + ExceptionMessage = CurrentExceptionMessage(); + } + TString ExceptionMessage; }; @@ -495,7 +495,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } }; - public: + public: TClientRequest* CreateClient() override { return new TRequest(); } @@ -504,9 +504,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { ExceptionMessage = CurrentExceptionMessage(); } - TString ExceptionMessage; - }; - + TString ExceptionMessage; + }; + class TListenerSockAddrReplyServer: public THttpServer::ICallBack { class TRequest: public TClientRequest { public: @@ -542,22 +542,22 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { }; Y_UNIT_TEST(TTestReleaseConnection) { - TPortManager pm; - const ui16 port = pm.GetPort(); - - TReleaseConnectionServer serverImpl; - THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true)); - UNIT_ASSERT(server.Start()); - - TTestRequest r(port, "request"); - r.KeepAliveConnection = true; - - UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription()); - - server.Stop(); - - UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error"); - }; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TReleaseConnectionServer serverImpl; + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true)); + UNIT_ASSERT(server.Start()); + + TTestRequest r(port, "request"); + r.KeepAliveConnection = true; + + UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription()); + + server.Stop(); + + UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error"); + }; THttpInput SendRequest(TSocket& socket, ui16 port) { TSocketInput si(socket); diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index 38eda0e5e7..602649e430 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -6,7 +6,7 @@ #include <util/generic/size_literals.h> #include <util/generic/string.h> #include <util/generic/vector.h> -#include <util/datetime/base.h> +#include <util/datetime/base.h> class THttpServerOptions { public: @@ -93,10 +93,10 @@ public: return *this; } - + inline THttpServerOptions& SetClientTimeout(const TDuration& timeout) noexcept { ClientTimeout = timeout; - + return *this; } @@ -107,11 +107,11 @@ public: } inline THttpServerOptions& SetOutputBufferSize(size_t val) noexcept { - OutputBufferSize = val; - - return *this; - } - + OutputBufferSize = val; + + return *this; + } + inline THttpServerOptions& SetMaxInputContentLength(ui64 val) noexcept { MaxInputContentLength = val; @@ -162,7 +162,7 @@ public: ui32 MaxConnections = 100; int ListenBacklog = SOMAXCONN; TDuration ClientTimeout; - size_t OutputBufferSize = 0; + size_t OutputBufferSize = 0; ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB; size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed bool UseElasticQueues = false; diff --git a/library/cpp/http/server/response.h b/library/cpp/http/server/response.h index a75cb85605..eed4afc7b6 100644 --- a/library/cpp/http/server/response.h +++ b/library/cpp/http/server/response.h @@ -34,10 +34,10 @@ public: THttpResponse& AddMultipleHeaders(const THttpHeaders& headers); - const THttpHeaders& GetHeaders() const { - return Headers; - } - + const THttpHeaders& GetHeaders() const { + return Headers; + } + THttpResponse& SetContentType(const TStringBuf& contentType); /** diff --git a/library/cpp/http/server/response_ut.cpp b/library/cpp/http/server/response_ut.cpp index 73e2112ad3..8a142fb1ba 100644 --- a/library/cpp/http/server/response_ut.cpp +++ b/library/cpp/http/server/response_ut.cpp @@ -46,25 +46,25 @@ Y_UNIT_TEST_SUITE(TestHttpResponse) { EXPECTED); } - Y_UNIT_TEST(TestGetHeaders) { - THttpResponse resp(HTTP_FORBIDDEN); - - THttpHeaders headers; - headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); - headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); - headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); - resp.AddMultipleHeaders(headers); - resp.AddHeader("X-Header-4", "ValueFour"); - - const THttpHeaders& gotHeaders = resp.GetHeaders(); - UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4); - UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1")); - UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne"); - UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4")); - UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour"); - } - - + Y_UNIT_TEST(TestGetHeaders) { + THttpResponse resp(HTTP_FORBIDDEN); + + THttpHeaders headers; + headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); + headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); + resp.AddMultipleHeaders(headers); + resp.AddHeader("X-Header-4", "ValueFour"); + + const THttpHeaders& gotHeaders = resp.GetHeaders(); + UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne"); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour"); + } + + Y_UNIT_TEST(TestSetContent) { const char* EXPECTED = "HTTP/1.1 200 Ok\r\n" "Content-Length: 10\r\n" diff --git a/library/cpp/streams/brotli/brotli.cpp b/library/cpp/streams/brotli/brotli.cpp index 38052cb688..a5536eff99 100644 --- a/library/cpp/streams/brotli/brotli.cpp +++ b/library/cpp/streams/brotli/brotli.cpp @@ -139,7 +139,7 @@ public: ui8* outBuffer = static_cast<ui8*>(buffer); size_t availableOut = size; - size_t decompressedSize = 0; + size_t decompressedSize = 0; BrotliDecoderResult result; do { @@ -163,7 +163,7 @@ public: &outBuffer, nullptr); - decompressedSize = size - availableOut; + decompressedSize = size - availableOut; SubstreamFinished_ = (result == BROTLI_DECODER_RESULT_SUCCESS); if (result == BROTLI_DECODER_RESULT_ERROR) { @@ -175,7 +175,7 @@ public: "Buffer passed to read in Brotli decoder is too small"); break; } - } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_); + } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_); if (!SubstreamFinished_ && decompressedSize == 0) { ythrow yexception() << "Input stream is incomplete"; diff --git a/library/cpp/streams/brotli/brotli_ut.cpp b/library/cpp/streams/brotli/brotli_ut.cpp index aeb2e284dc..fb4372c4a5 100644 --- a/library/cpp/streams/brotli/brotli_ut.cpp +++ b/library/cpp/streams/brotli/brotli_ut.cpp @@ -41,23 +41,23 @@ Y_UNIT_TEST_SUITE(TBrotliTestSuite) { TestCase("hello world"); } - Y_UNIT_TEST(TestFlush) { - TStringStream ss; - TBrotliCompress compressStream(&ss); - TBrotliDecompress decompressStream(&ss); - - for (size_t i = 0; i < 3; ++i) { - TString s = GenerateRandomString(1 << 15); - compressStream.Write(s.data(), s.size()); - compressStream.Flush(); - - TString r(s.size(), '*'); - decompressStream.Load((char*)r.data(), r.size()); - - UNIT_ASSERT_VALUES_EQUAL(s, r); - } - } - + Y_UNIT_TEST(TestFlush) { + TStringStream ss; + TBrotliCompress compressStream(&ss); + TBrotliDecompress decompressStream(&ss); + + for (size_t i = 0; i < 3; ++i) { + TString s = GenerateRandomString(1 << 15); + compressStream.Write(s.data(), s.size()); + compressStream.Flush(); + + TString r(s.size(), '*'); + decompressStream.Load((char*)r.data(), r.size()); + + UNIT_ASSERT_VALUES_EQUAL(s, r); + } + } + Y_UNIT_TEST(TestSeveralStreams) { auto s1 = GenerateRandomString(1 << 15); auto s2 = GenerateRandomString(1 << 15); diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp index 54a848e912..aaec19daa6 100644 --- a/library/cpp/threading/equeue/equeue.cpp +++ b/library/cpp/threading/equeue/equeue.cpp @@ -1,80 +1,80 @@ -#include "equeue.h" - -TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue) - : SlaveQueue_(std::move(slaveQueue)) -{ -} - -size_t TElasticQueue::ObjectCount() const { - return (size_t)AtomicGet(ObjectCount_); -} - -bool TElasticQueue::TryIncCounter() { - if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) { - AtomicDecrement(GuardCount_); - return false; - } - - return true; -} - - - -class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue { -public: - TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue) - : RealObject_(realObject) - , Queue_(queue) - { - AtomicIncrement(Queue_->ObjectCount_); - } - +#include "equeue.h" + +TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue) + : SlaveQueue_(std::move(slaveQueue)) +{ +} + +size_t TElasticQueue::ObjectCount() const { + return (size_t)AtomicGet(ObjectCount_); +} + +bool TElasticQueue::TryIncCounter() { + if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) { + AtomicDecrement(GuardCount_); + return false; + } + + return true; +} + + + +class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue { +public: + TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue) + : RealObject_(realObject) + , Queue_(queue) + { + AtomicIncrement(Queue_->ObjectCount_); + } + ~TDecrementingWrapper() override { - AtomicDecrement(Queue_->ObjectCount_); - AtomicDecrement(Queue_->GuardCount_); - } -private: - void Process(void *tsr) override { - THolder<TDecrementingWrapper> self(this); - RealObject_->Process(tsr); - } -private: - IObjectInQueue* const RealObject_; - TElasticQueue* const Queue_; -}; - - - -bool TElasticQueue::Add(IObjectInQueue* obj) { - if (!TryIncCounter()) { - return false; - } - - THolder<TDecrementingWrapper> wrapper; - try { - wrapper.Reset(new TDecrementingWrapper(obj, this)); - } catch (...) { - AtomicDecrement(GuardCount_); - throw; - } - - if (SlaveQueue_->Add(wrapper.Get())) { + AtomicDecrement(Queue_->ObjectCount_); + AtomicDecrement(Queue_->GuardCount_); + } +private: + void Process(void *tsr) override { + THolder<TDecrementingWrapper> self(this); + RealObject_->Process(tsr); + } +private: + IObjectInQueue* const RealObject_; + TElasticQueue* const Queue_; +}; + + + +bool TElasticQueue::Add(IObjectInQueue* obj) { + if (!TryIncCounter()) { + return false; + } + + THolder<TDecrementingWrapper> wrapper; + try { + wrapper.Reset(new TDecrementingWrapper(obj, this)); + } catch (...) { + AtomicDecrement(GuardCount_); + throw; + } + + if (SlaveQueue_->Add(wrapper.Get())) { Y_UNUSED(wrapper.Release()); - return true; - } else { - return false; - } -} - -void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) { - MaxQueueSize_ = maxQueueSize; - SlaveQueue_->Start(threadCount, maxQueueSize); -} - + return true; + } else { + return false; + } +} + +void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) { + MaxQueueSize_ = maxQueueSize; + SlaveQueue_->Start(threadCount, maxQueueSize); +} + void TElasticQueue::Stop() noexcept { - return SlaveQueue_->Stop(); -} - + return SlaveQueue_->Stop(); +} + size_t TElasticQueue::Size() const noexcept { - return SlaveQueue_->Size(); -} + return SlaveQueue_->Size(); +} diff --git a/library/cpp/threading/equeue/equeue.h b/library/cpp/threading/equeue/equeue.h index 40dd342585..403e993713 100644 --- a/library/cpp/threading/equeue/equeue.h +++ b/library/cpp/threading/equeue/equeue.h @@ -1,28 +1,28 @@ -#pragma once - +#pragma once + #include <util/thread/pool.h> -#include <util/system/atomic.h> -#include <util/generic/ptr.h> - -//actual queue limit will be (maxQueueSize - numBusyThreads) or 0 +#include <util/system/atomic.h> +#include <util/generic/ptr.h> + +//actual queue limit will be (maxQueueSize - numBusyThreads) or 0 class TElasticQueue: public IThreadPool { -public: - explicit TElasticQueue(THolder<IThreadPool> slaveQueue); - - bool Add(IObjectInQueue* obj) override; - size_t Size() const noexcept override; - - void Start(size_t threadCount, size_t maxQueueSize) override; - void Stop() noexcept override; - - size_t ObjectCount() const; -private: - class TDecrementingWrapper; - - bool TryIncCounter(); -private: +public: + explicit TElasticQueue(THolder<IThreadPool> slaveQueue); + + bool Add(IObjectInQueue* obj) override; + size_t Size() const noexcept override; + + void Start(size_t threadCount, size_t maxQueueSize) override; + void Stop() noexcept override; + + size_t ObjectCount() const; +private: + class TDecrementingWrapper; + + bool TryIncCounter(); +private: THolder<IThreadPool> SlaveQueue_; - size_t MaxQueueSize_ = 0; - TAtomic ObjectCount_ = 0; - TAtomic GuardCount_ = 0; -}; + size_t MaxQueueSize_ = 0; + TAtomic ObjectCount_ = 0; + TAtomic GuardCount_ = 0; +}; diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 9cf2aced44..defa1a0e82 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,125 +1,125 @@ -#include "equeue.h" - +#include "equeue.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/system/event.h> -#include <util/datetime/base.h> -#include <util/generic/vector.h> - + +#include <util/system/event.h> +#include <util/datetime/base.h> +#include <util/generic/vector.h> + Y_UNIT_TEST_SUITE(TElasticQueueTest) { - const size_t MaxQueueSize = 20; - const size_t ThreadCount = 10; - const size_t N = 100000; - - static THolder<TElasticQueue> Queue; - - struct TQueueSetup { - TQueueSetup() { + const size_t MaxQueueSize = 20; + const size_t ThreadCount = 10; + const size_t N = 100000; + + static THolder<TElasticQueue> Queue; + + struct TQueueSetup { + TQueueSetup() { Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } - }; - - struct TCounters { - void Reset() { - Processed = Scheduled = Discarded = Total = 0; - } - - TAtomic Processed; - TAtomic Scheduled; - TAtomic Discarded; - TAtomic Total; - }; - static TCounters Counters; - -//fill test -- fill queue with "endless" jobs + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } + }; + + struct TCounters { + void Reset() { + Processed = Scheduled = Discarded = Total = 0; + } + + TAtomic Processed; + TAtomic Scheduled; + TAtomic Discarded; + TAtomic Total; + }; + static TCounters Counters; + +//fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; Y_UNIT_TEST(FillTest) { - Counters.Reset(); - - struct TWaitJob: public IObjectInQueue { + Counters.Reset(); + + struct TWaitJob: public IObjectInQueue { void Process(void*) override { - WaitEvent.Wait(); - AtomicIncrement(Counters.Processed); - } - } job; - - struct TLocalSetup: TQueueSetup { - ~TLocalSetup() { - WaitEvent.Signal(); - } - }; - - size_t enqueued = 0; - { - TLocalSetup setup; - while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { - ++enqueued; - } - - UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); - } - - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); - } - - -//concurrent test -- send many jobs from different threads - struct TJob: public IObjectInQueue { + WaitEvent.Wait(); + AtomicIncrement(Counters.Processed); + } + } job; + + struct TLocalSetup: TQueueSetup { + ~TLocalSetup() { + WaitEvent.Signal(); + } + }; + + size_t enqueued = 0; + { + TLocalSetup setup; + while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + ++enqueued; + } + + UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); + UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); + } + + +//concurrent test -- send many jobs from different threads + struct TJob: public IObjectInQueue { void Process(void*) override { - AtomicIncrement(Counters.Processed); - }; - }; - static TJob Job; - - static bool TryAdd() { - AtomicIncrement(Counters.Total); - if (Queue->Add(&Job)) { - AtomicIncrement(Counters.Scheduled); - return true; - } else { - AtomicIncrement(Counters.Discarded); - return false; - } - } - - static size_t TryCounter; - + AtomicIncrement(Counters.Processed); + }; + }; + static TJob Job; + + static bool TryAdd() { + AtomicIncrement(Counters.Total); + if (Queue->Add(&Job)) { + AtomicIncrement(Counters.Scheduled); + return true; + } else { + AtomicIncrement(Counters.Discarded); + return false; + } + } + + static size_t TryCounter; + Y_UNIT_TEST(ConcurrentTest) { - Counters.Reset(); - TryCounter = 0; - + Counters.Reset(); + TryCounter = 0; + struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { - while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd()) { - Sleep(TDuration::MicroSeconds(50)); - } - } - } - } sender; - - { - TQueueSetup setup; - + while ((size_t)AtomicIncrement(TryCounter) <= N) { + if (!TryAdd()) { + Sleep(TDuration::MicroSeconds(50)); + } + } + } + } sender; + + { + TQueueSetup setup; + TVector< TAutoPtr<IThreadFactory::IThread> > senders; - for (size_t i = 0; i < ThreadCount; ++i) { + for (size_t i = 0; i < ThreadCount; ++i) { senders.push_back(::SystemThreadFactory()->Run(&sender)); - } - - for (size_t i = 0; i < senders.size(); ++i) { - senders[i]->Join(); - } - } - - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); - UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); - UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); - } -} + } + + for (size_t i = 0; i < senders.size(); ++i) { + senders[i]->Join(); + } + } + + UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); + UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); + UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); + } +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp index 174dde4bf7..407761eea7 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp @@ -1,246 +1,246 @@ #include "task_scheduler.h" - -#include <util/system/thread.h> -#include <util/string/cast.h> + +#include <util/system/thread.h> +#include <util/string/cast.h> #include <util/stream/output.h> - -TTaskScheduler::ITask::~ITask() {} -TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} - - - -class TTaskScheduler::TWorkerThread + +TTaskScheduler::ITask::~ITask() {} +TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} + + + +class TTaskScheduler::TWorkerThread : public ISimpleThread -{ -public: - TWorkerThread(TTaskScheduler& state) - : Scheduler_(state) - { - } - +{ +public: + TWorkerThread(TTaskScheduler& state) + : Scheduler_(state) + { + } + TString DebugState = "?"; TString DebugId = ""; -private: +private: void* ThreadProc() noexcept override { - Scheduler_.WorkerFunc(this); - return nullptr; - } -private: - TTaskScheduler& Scheduler_; -}; - - - -TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount) - : MaxTaskCount_(maxTaskCount) -{ - for (size_t i = 0; i < threadCount; ++i) { - Workers_.push_back(new TWorkerThread(*this)); - Workers_.back()->DebugId = ToString(i); - } -} - -TTaskScheduler::~TTaskScheduler() { - try { - Stop(); - } catch (...) { + Scheduler_.WorkerFunc(this); + return nullptr; + } +private: + TTaskScheduler& Scheduler_; +}; + + + +TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount) + : MaxTaskCount_(maxTaskCount) +{ + for (size_t i = 0; i < threadCount; ++i) { + Workers_.push_back(new TWorkerThread(*this)); + Workers_.back()->DebugId = ToString(i); + } +} + +TTaskScheduler::~TTaskScheduler() { + try { + Stop(); + } catch (...) { Cdbg << "task scheduled destruction error: " << CurrentExceptionMessage(); - } -} - -void TTaskScheduler::Start() { - for (auto& w : Workers_) { - w->Start(); - } -} - -void TTaskScheduler::Stop() { - with_lock (Lock_) { - IsStopped_ = true; - CondVar_.BroadCast(); - } - - for (auto& w: Workers_) { - w->Join(); - } - - Workers_.clear(); - Queue_.clear(); -} - -size_t TTaskScheduler::GetTaskCount() const { - return static_cast<size_t>(AtomicGet(TaskCounter_)); -} - -namespace { - class TTaskWrapper - : public TTaskScheduler::ITask - , TNonCopyable - { - public: - TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter) - : Task_(task) - , Counter_(counter) - { - AtomicIncrement(Counter_); - } - + } +} + +void TTaskScheduler::Start() { + for (auto& w : Workers_) { + w->Start(); + } +} + +void TTaskScheduler::Stop() { + with_lock (Lock_) { + IsStopped_ = true; + CondVar_.BroadCast(); + } + + for (auto& w: Workers_) { + w->Join(); + } + + Workers_.clear(); + Queue_.clear(); +} + +size_t TTaskScheduler::GetTaskCount() const { + return static_cast<size_t>(AtomicGet(TaskCounter_)); +} + +namespace { + class TTaskWrapper + : public TTaskScheduler::ITask + , TNonCopyable + { + public: + TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter) + : Task_(task) + , Counter_(counter) + { + AtomicIncrement(Counter_); + } + ~TTaskWrapper() override { - AtomicDecrement(Counter_); - } - private: - TInstant Process() override { - return Task_->Process(); - } - private: - TTaskScheduler::ITaskRef Task_; - TAtomic& Counter_; - }; -} - -bool TTaskScheduler::Add(ITaskRef task, TInstant expire) { - with_lock (Lock_) { - if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) { - ITaskRef newTask = new TTaskWrapper(task, TaskCounter_); + AtomicDecrement(Counter_); + } + private: + TInstant Process() override { + return Task_->Process(); + } + private: + TTaskScheduler::ITaskRef Task_; + TAtomic& Counter_; + }; +} + +bool TTaskScheduler::Add(ITaskRef task, TInstant expire) { + with_lock (Lock_) { + if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) { + ITaskRef newTask = new TTaskWrapper(task, TaskCounter_); Queue_.insert(std::make_pair(expire, TTaskHolder(newTask))); - - if (!Queue_.begin()->second.WaitingWorker) { - CondVar_.Signal(); - } - return true; - } - } - - return false; -} - -namespace { - class TRepeatedTask - : public TTaskScheduler::ITask - { - public: - TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline) - : Task_(task) - , Period_(period) - , Deadline_(deadline) - { - } - private: - TInstant Process() final { - Deadline_ += Period_; - if (Task_->Process()) { - return Deadline_; - } else { - return TInstant::Max(); - } - } - private: - TTaskScheduler::IRepeatedTaskRef Task_; - TDuration Period_; - TInstant Deadline_; - }; -} - -bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { - const TInstant deadline = Now() + period; - ITaskRef t = new TRepeatedTask(task, period, deadline); - return Add(t, deadline); -} - - -const bool debugOutput = false; - + + if (!Queue_.begin()->second.WaitingWorker) { + CondVar_.Signal(); + } + return true; + } + } + + return false; +} + +namespace { + class TRepeatedTask + : public TTaskScheduler::ITask + { + public: + TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline) + : Task_(task) + , Period_(period) + , Deadline_(deadline) + { + } + private: + TInstant Process() final { + Deadline_ += Period_; + if (Task_->Process()) { + return Deadline_; + } else { + return TInstant::Max(); + } + } + private: + TTaskScheduler::IRepeatedTaskRef Task_; + TDuration Period_; + TInstant Deadline_; + }; +} + +bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { + const TInstant deadline = Now() + period; + ITaskRef t = new TRepeatedTask(task, period, deadline); + return Add(t, deadline); +} + + +const bool debugOutput = false; + void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) { - if (!debugOutput) { + if (!debugOutput) { Y_UNUSED(thread); Y_UNUSED(state); - return; - } - - thread->DebugState = state; - - TStringStream ss; - ss << Now() << " " << thread->DebugId << ":\t"; - for (auto& w : Workers_) { - ss << w->DebugState << " "; - } - ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl; + return; + } + + thread->DebugState = state; + + TStringStream ss; + ss << Now() << " " << thread->DebugId << ":\t"; + for (auto& w : Workers_) { + ss << w->DebugState << " "; + } + ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl; Cerr << ss.Str(); -} - -bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) { - ChangeDebugState(thread, "w"); - toWait->second.WaitingWorker = thread; - return !CondVar_.WaitD(Lock_, toWait->first); -} - -void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) { - for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) { - if (!it->second.WaitingWorker) { - if (toWait == Queue_.end()) { - toWait = it; - } else if (it->first < toWait->first) { - toWait->second.WaitingWorker = nullptr; - toWait = it; - } - break; - } - } -} - -void TTaskScheduler::WorkerFunc(TWorkerThread* thread) { +} + +bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) { + ChangeDebugState(thread, "w"); + toWait->second.WaitingWorker = thread; + return !CondVar_.WaitD(Lock_, toWait->first); +} + +void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) { + for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) { + if (!it->second.WaitingWorker) { + if (toWait == Queue_.end()) { + toWait = it; + } else if (it->first < toWait->first) { + toWait->second.WaitingWorker = nullptr; + toWait = it; + } + break; + } + } +} + +void TTaskScheduler::WorkerFunc(TWorkerThread* thread) { TThread::SetCurrentThreadName("TaskSchedWorker"); - TQueueIterator toWait = Queue_.end(); - ITaskRef toDo; - - for (;;) { - TInstant repeat = TInstant::Max(); - - if (!!toDo) { - try { - repeat = toDo->Process(); - } catch (...) { + TQueueIterator toWait = Queue_.end(); + ITaskRef toDo; + + for (;;) { + TInstant repeat = TInstant::Max(); + + if (!!toDo) { + try { + repeat = toDo->Process(); + } catch (...) { Cdbg << "task scheduler error: " << CurrentExceptionMessage(); - } - } - - - with_lock (Lock_) { - ChangeDebugState(thread, "f"); - - if (IsStopped_) { - ChangeDebugState(thread, "s"); - return ; - } - - if (!!toDo) { - if (repeat < TInstant::Max()) { + } + } + + + with_lock (Lock_) { + ChangeDebugState(thread, "f"); + + if (IsStopped_) { + ChangeDebugState(thread, "s"); + return ; + } + + if (!!toDo) { + if (repeat < TInstant::Max()) { Queue_.insert(std::make_pair(repeat, TTaskHolder(toDo))); - } - } - - toDo = nullptr; - - ChooseFromQueue(toWait); - - if (toWait != Queue_.end()) { - if (toWait->first <= Now() || Wait(thread, toWait)) { - - toDo = toWait->second.Task; - Queue_.erase(toWait); - toWait = Queue_.end(); - - if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) { - CondVar_.Signal(); - } - - ChangeDebugState(thread, "p"); - } - } else { - ChangeDebugState(thread, "e"); - CondVar_.WaitI(Lock_); - } - } - } -} + } + } + + toDo = nullptr; + + ChooseFromQueue(toWait); + + if (toWait != Queue_.end()) { + if (toWait->first <= Now() || Wait(thread, toWait)) { + + toDo = toWait->second.Task; + Queue_.erase(toWait); + toWait = Queue_.end(); + + if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) { + CondVar_.Signal(); + } + + ChangeDebugState(thread, "p"); + } + } else { + ChangeDebugState(thread, "e"); + CondVar_.WaitI(Lock_); + } + } + } +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h index df4da941a8..166946a2aa 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.h +++ b/library/cpp/threading/task_scheduler/task_scheduler.h @@ -1,86 +1,86 @@ -#pragma once - -#include <util/generic/vector.h> -#include <util/generic/ptr.h> -#include <util/generic/map.h> - -#include <util/datetime/base.h> - -#include <util/system/condvar.h> -#include <util/system/mutex.h> - -class TTaskScheduler { -public: - class ITask; - using ITaskRef = TIntrusivePtr<ITask>; - - class IRepeatedTask; - using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; -public: - explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); - ~TTaskScheduler(); - - void Start(); - void Stop(); - - bool Add(ITaskRef task, TInstant expire); - bool Add(IRepeatedTaskRef task, TDuration period); - - size_t GetTaskCount() const; -private: - class TWorkerThread; - - struct TTaskHolder { - explicit TTaskHolder(ITaskRef& task) - : Task(task) - { - } - public: - ITaskRef Task; - TWorkerThread* WaitingWorker = nullptr; - }; - +#pragma once + +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/generic/map.h> + +#include <util/datetime/base.h> + +#include <util/system/condvar.h> +#include <util/system/mutex.h> + +class TTaskScheduler { +public: + class ITask; + using ITaskRef = TIntrusivePtr<ITask>; + + class IRepeatedTask; + using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; +public: + explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); + ~TTaskScheduler(); + + void Start(); + void Stop(); + + bool Add(ITaskRef task, TInstant expire); + bool Add(IRepeatedTaskRef task, TDuration period); + + size_t GetTaskCount() const; +private: + class TWorkerThread; + + struct TTaskHolder { + explicit TTaskHolder(ITaskRef& task) + : Task(task) + { + } + public: + ITaskRef Task; + TWorkerThread* WaitingWorker = nullptr; + }; + using TQueueType = TMultiMap<TInstant, TTaskHolder>; using TQueueIterator = TQueueType::iterator; -private: +private: void ChangeDebugState(TWorkerThread* thread, const TString& state); - void ChooseFromQueue(TQueueIterator& toWait); - bool Wait(TWorkerThread* thread, TQueueIterator& toWait); - - void WorkerFunc(TWorkerThread* thread); -private: - bool IsStopped_ = false; - - TAtomic TaskCounter_ = 0; + void ChooseFromQueue(TQueueIterator& toWait); + bool Wait(TWorkerThread* thread, TQueueIterator& toWait); + + void WorkerFunc(TWorkerThread* thread); +private: + bool IsStopped_ = false; + + TAtomic TaskCounter_ = 0; TQueueType Queue_; - - TCondVar CondVar_; - TMutex Lock_; - + + TCondVar CondVar_; + TMutex Lock_; + TVector<TAutoPtr<TWorkerThread>> Workers_; - - const size_t MaxTaskCount_; -}; - -class TTaskScheduler::ITask - : public TAtomicRefCount<ITask> -{ -public: - virtual ~ITask(); - - virtual TInstant Process() {//returns time to repeat this task - return TInstant::Max(); - } -}; - -class TTaskScheduler::IRepeatedTask - : public TAtomicRefCount<IRepeatedTask> -{ -public: - virtual ~IRepeatedTask(); - - virtual bool Process() {//returns if to repeat task again - return false; - } -}; - + + const size_t MaxTaskCount_; +}; + +class TTaskScheduler::ITask + : public TAtomicRefCount<ITask> +{ +public: + virtual ~ITask(); + + virtual TInstant Process() {//returns time to repeat this task + return TInstant::Max(); + } +}; + +class TTaskScheduler::IRepeatedTask + : public TAtomicRefCount<IRepeatedTask> +{ +public: + virtual ~IRepeatedTask(); + + virtual bool Process() {//returns if to repeat task again + return false; + } +}; + diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index 3b5203194a..cb2daa718b 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -2,7 +2,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/stream/output.h> -#include <util/system/atomic.h> +#include <util/system/atomic.h> #include <util/generic/vector.h> #include "task_scheduler.h" @@ -12,13 +12,13 @@ class TTaskSchedulerTest: public TTestBase { UNIT_TEST(Test); UNIT_TEST_SUITE_END(); - class TCheckTask: public TTaskScheduler::IRepeatedTask { + class TCheckTask: public TTaskScheduler::IRepeatedTask { public: TCheckTask(const TDuration& delay) : Start_(Now()) , Delay_(delay) { - AtomicIncrement(ScheduledTaskCounter_); + AtomicIncrement(ScheduledTaskCounter_); } ~TCheckTask() override { @@ -28,28 +28,28 @@ class TTaskSchedulerTest: public TTestBase { const TDuration delay = Now() - Start_; if (delay < Delay_) { - AtomicIncrement(BadTimeoutCounter_); + AtomicIncrement(BadTimeoutCounter_); } - AtomicIncrement(ExecutedTaskCounter_); + AtomicIncrement(ExecutedTaskCounter_); return false; } static bool AllTaskExecuted() { - return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); + return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); } static size_t BadTimeoutCount() { - return AtomicGet(BadTimeoutCounter_); + return AtomicGet(BadTimeoutCounter_); } private: TInstant Start_; TDuration Delay_; - static TAtomic BadTimeoutCounter_; - static TAtomic ScheduledTaskCounter_; - static TAtomic ExecutedTaskCounter_; + static TAtomic BadTimeoutCounter_; + static TAtomic ScheduledTaskCounter_; + static TAtomic ExecutedTaskCounter_; }; public: @@ -72,15 +72,15 @@ class TTaskSchedulerTest: public TTestBase { void ScheduleCheckTask(size_t delay) { TDuration d = TDuration::MicroSeconds(delay); - Scheduler_.Add(new TCheckTask(d), d); + Scheduler_.Add(new TCheckTask(d), d); } private: - TTaskScheduler Scheduler_; + TTaskScheduler Scheduler_; }; -TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0; UNIT_TEST_SUITE_REGISTRATION(TTaskSchedulerTest); |