diff options
author | kulikov <kulikov@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:34 +0300 |
commit | c707901605d7b7c6cba0998cd52e1ae619c97762 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 | |
parent | 65e5266709e7ff94b14ae128309e229de714b0df (diff) | |
download | ydb-c707901605d7b7c6cba0998cd52e1ae619c97762.tar.gz |
Restoring authorship annotation for <kulikov@yandex-team.ru>. Commit 2 of 2.
42 files changed, 1193 insertions, 1193 deletions
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h index d158bbe9f5..b638b2df1a 100644 --- a/library/cpp/coroutine/engine/cont_poller.h +++ b/library/cpp/coroutine/engine/cont_poller.h @@ -143,13 +143,13 @@ namespace NCoro { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); lst->PushFront(event); - ui16 newFlags = Flags(*lst); + ui16 newFlags = Flags(*lst); if (newFlags != oldFlags) { - if (oldFlags) { - newFlags |= CONT_POLL_MODIFY; - } - + if (oldFlags) { + newFlags |= CONT_POLL_MODIFY; + } + P_->Set(lst, event->Fd(), newFlags); } } @@ -158,13 +158,13 @@ namespace NCoro { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); event->Unlink(); - ui16 newFlags = Flags(*lst); + ui16 newFlags = Flags(*lst); if (newFlags != oldFlags) { - if (newFlags) { - newFlags |= CONT_POLL_MODIFY; - } - + if (newFlags) { + newFlags |= CONT_POLL_MODIFY; + } + P_->Set(lst, event->Fd(), newFlags); } } @@ -174,9 +174,9 @@ namespace NCoro { P_->Wait(events, deadLine); } - EContPoller PollEngine() const { - return P_->PollEngine(); - } + EContPoller PollEngine() const { + return P_->PollEngine(); + } private: static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept { ui16 ret = 0; diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp index a7012eb8e2..8b372496a2 100644 --- a/library/cpp/coroutine/engine/coroutine_ut.cpp +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -8,16 +8,16 @@ #include <util/system/pipe.h> #include <util/system/env.h> #include <util/system/info.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/generic/xrange.h> -#include <util/generic/serialized_enum.h> +#include <util/generic/serialized_enum.h> // TODO (velavokr): BALANCER-1345 add more tests on pollers class TCoroTest: public TTestBase { UNIT_TEST_SUITE(TCoroTest); UNIT_TEST(TestSimpleX1); - UNIT_TEST(TestSimpleX1MultiThread); + UNIT_TEST(TestSimpleX1MultiThread); UNIT_TEST(TestSimpleX2); UNIT_TEST(TestSimpleX3); UNIT_TEST(TestMemFun); @@ -40,10 +40,10 @@ class TCoroTest: public TTestBase { UNIT_TEST(TestLegacyCancelYieldRaceBug) UNIT_TEST(TestJoinRescheduleBug); UNIT_TEST(TestEventQueue) - UNIT_TEST(TestNestedExecutor) - UNIT_TEST(TestComputeCoroutineYield) - UNIT_TEST(TestPollEngines); - UNIT_TEST(TestUserEvent); + UNIT_TEST(TestNestedExecutor) + UNIT_TEST(TestComputeCoroutineYield) + UNIT_TEST(TestPollEngines); + UNIT_TEST(TestUserEvent); UNIT_TEST(TestPause); UNIT_TEST(TestOverrideTime); UNIT_TEST_SUITE_END(); @@ -51,7 +51,7 @@ class TCoroTest: public TTestBase { public: void TestException(); void TestSimpleX1(); - void TestSimpleX1MultiThread(); + void TestSimpleX1MultiThread(); void TestSimpleX2(); void TestSimpleX3(); void TestMemFun(); @@ -72,10 +72,10 @@ public: void TestLegacyCancelYieldRaceBug(); void TestJoinRescheduleBug(); void TestEventQueue(); - void TestNestedExecutor(); - void TestComputeCoroutineYield(); - void TestPollEngines(); - void TestUserEvent(); + void TestNestedExecutor(); + void TestComputeCoroutineYield(); + void TestPollEngines(); + void TestUserEvent(); void TestPause(); void TestOverrideTime(); }; @@ -130,54 +130,54 @@ static int i0; static void CoRun(TCont* c, void* /*run*/) { while (i0 < 100000) { ++i0; - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); c->Yield(); - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); } } static void CoMain(TCont* c, void* /*arg*/) { for (volatile size_t i2 = 0; i2 < 10; ++i2) { - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); c->Executor()->Create(CoRun, nullptr, "run"); - UNIT_ASSERT(RunningCont() == c); + UNIT_ASSERT(RunningCont() == c); } } void TCoroTest::TestSimpleX1() { i0 = 0; TContExecutor e(32000); - - UNIT_ASSERT(RunningCont() == nullptr); - + + UNIT_ASSERT(RunningCont() == nullptr); + e.Execute(CoMain); UNIT_ASSERT_VALUES_EQUAL(i0, 100000); - - UNIT_ASSERT(RunningCont() == nullptr); -} - -void TCoroTest::TestSimpleX1MultiThread() { - TVector<THolder<TThread>> threads; - const size_t nThreads = 0; - TAtomic c = 0; - for (size_t i = 0; i < nThreads; ++i) { - threads.push_back(MakeHolder<TThread>([&]() { - TestSimpleX1(); - AtomicIncrement(c); - })); - } - - for (auto& t : threads) { - t->Start(); - } - - for (auto& t: threads) { - t->Join(); - } - - UNIT_ASSERT_EQUAL(c, nThreads); -} - + + UNIT_ASSERT(RunningCont() == nullptr); +} + +void TCoroTest::TestSimpleX1MultiThread() { + TVector<THolder<TThread>> threads; + const size_t nThreads = 0; + TAtomic c = 0; + for (size_t i = 0; i < nThreads; ++i) { + threads.push_back(MakeHolder<TThread>([&]() { + TestSimpleX1(); + AtomicIncrement(c); + })); + } + + for (auto& t : threads) { + t->Start(); + } + + for (auto& t: threads) { + t->Join(); + } + + UNIT_ASSERT_EQUAL(c, nThreads); +} + struct TTestObject { int i = 0; int j = 0; @@ -863,85 +863,85 @@ void TCoroTest::TestEventQueue() { }, &queue); } -void TCoroTest::TestNestedExecutor() { -#ifndef _win_ - //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr - TContExecutor exec(32000); - UNIT_ASSERT(!RunningCont()); - - exec.Execute([](TCont* cont, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); - - TContExecutor exec2(32000); - exec2.Execute([](TCont* cont2, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); - TContExecutor exec3(32000); - exec3.Execute([](TCont* cont3, void*) { - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3); - }); - - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); - }); - - UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); - }); - - UNIT_ASSERT(!RunningCont()); -#endif -} - -void TCoroTest::TestComputeCoroutineYield() { -//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow - TContExecutor exec(32000); - exec.SetFailOnError(true); - - TPipe in, out; - TPipe::Pipe(in, out); - SetNonBlock(in.GetHandle()); - size_t lastRead = 42; - - auto compute = [&](TCont* cont) { - for (size_t i = 0; i < 10; ++i) { - write(out.GetHandle(), &i, sizeof i); - Sleep(TDuration::MilliSeconds(10)); - cont->Yield(); - UNIT_ASSERT(lastRead == i); - } - }; - - auto io = [&](TCont* cont) { - for (size_t i = 0; i < 10; ++i) { - NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead); - } - }; - - exec.Create(compute, "compute"); - exec.Create(io, "io"); - - exec.Execute(); -} - -void TCoroTest::TestPollEngines() { - bool defaultChecked = false; - for (auto engine : GetEnumAllValues<EContPoller>()) { - auto poller = IPollerFace::Construct(engine); - if (!poller) { - continue; - } - - TContExecutor exec(32000, IPollerFace::Construct(engine)); - - if (engine == EContPoller::Default) { - defaultChecked = true; - UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined); - } else { - UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine); - } - } - - UNIT_ASSERT(defaultChecked); -} - +void TCoroTest::TestNestedExecutor() { +#ifndef _win_ + //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr + TContExecutor exec(32000); + UNIT_ASSERT(!RunningCont()); + + exec.Execute([](TCont* cont, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + + TContExecutor exec2(32000); + exec2.Execute([](TCont* cont2, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + TContExecutor exec3(32000); + exec3.Execute([](TCont* cont3, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + }); + + UNIT_ASSERT(!RunningCont()); +#endif +} + +void TCoroTest::TestComputeCoroutineYield() { +//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow + TContExecutor exec(32000); + exec.SetFailOnError(true); + + TPipe in, out; + TPipe::Pipe(in, out); + SetNonBlock(in.GetHandle()); + size_t lastRead = 42; + + auto compute = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + write(out.GetHandle(), &i, sizeof i); + Sleep(TDuration::MilliSeconds(10)); + cont->Yield(); + UNIT_ASSERT(lastRead == i); + } + }; + + auto io = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead); + } + }; + + exec.Create(compute, "compute"); + exec.Create(io, "io"); + + exec.Execute(); +} + +void TCoroTest::TestPollEngines() { + bool defaultChecked = false; + for (auto engine : GetEnumAllValues<EContPoller>()) { + auto poller = IPollerFace::Construct(engine); + if (!poller) { + continue; + } + + TContExecutor exec(32000, IPollerFace::Construct(engine)); + + if (engine == EContPoller::Default) { + defaultChecked = true; + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined); + } else { + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine); + } + } + + UNIT_ASSERT(defaultChecked); +} + void TCoroTest::TestPause() { TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()}; @@ -959,28 +959,28 @@ void TCoroTest::TestPause() { UNIT_ASSERT_EQUAL(i, 2); } -void TCoroTest::TestUserEvent() { - TContExecutor exec(32000); - - struct TUserEvent : public IUserEvent { - bool Called = false; - void Execute() override { - Called = true; - } - } event; - - auto f = [&](TCont* cont) { - UNIT_ASSERT(!event.Called); - exec.ScheduleUserEvent(&event); - UNIT_ASSERT(!event.Called); - cont->Yield(); - UNIT_ASSERT(event.Called); - }; - - exec.Execute(f); - - UNIT_ASSERT(event.Called); -} +void TCoroTest::TestUserEvent() { + TContExecutor exec(32000); + + struct TUserEvent : public IUserEvent { + bool Called = false; + void Execute() override { + Called = true; + } + } event; + + auto f = [&](TCont* cont) { + UNIT_ASSERT(!event.Called); + exec.ScheduleUserEvent(&event); + UNIT_ASSERT(!event.Called); + cont->Yield(); + UNIT_ASSERT(event.Called); + }; + + exec.Execute(f); + + UNIT_ASSERT(event.Called); +} void TCoroTest::TestOverrideTime() { class TTime: public NCoro::ITime { diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 9231d2b1ba..7ae6f74051 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -3,8 +3,8 @@ #include "stack/stack_allocator.h" #include "stack/stack_guards.h" -#include <util/generic/scope.h> -#include <util/thread/singleton.h> +#include <util/generic/scope.h> +#include <util/thread/singleton.h> #include <util/stream/format.h> #include <util/stream/output.h> #include <util/system/yassert.h> @@ -157,47 +157,47 @@ void TContExecutor::WaitForIO() { // Waking a coroutine puts it into ReadyNext_ list const auto next = WaitQueue_.WakeTimedout(now); - if (!UserEvents_.Empty()) { - TIntrusiveList<IUserEvent> userEvents; - userEvents.Swap(UserEvents_); - do { - userEvents.PopFront()->Execute(); - } while (!userEvents.Empty()); - } - + if (!UserEvents_.Empty()) { + TIntrusiveList<IUserEvent> userEvents; + userEvents.Swap(UserEvents_); + do { + userEvents.PopFront()->Execute(); + } while (!userEvents.Empty()); + } + // Polling will return as soon as there is an event to process or a timeout. // If there are woken coroutines we do not want to sleep in the poller // yet still we want to check for new io // to prevent ourselves from locking out of io by constantly waking coroutines. - if (ReadyNext_.Empty()) { + if (ReadyNext_.Empty()) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); } - Poll(next); + Poll(next); if (EnterPollerCallback_) { EnterPollerCallback_->OnExitPoller(); } - } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { + } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); } - Poll(now); + Poll(now); if (EnterPollerCallback_) { EnterPollerCallback_->OnExitPoller(); } - } + } Ready_.Append(ReadyNext_); } } -void TContExecutor::Poll(TInstant deadline) { - Poller_.Wait(PollerEvents_, deadline); - LastPoll_ = Now(); - - // Waking a coroutine puts it into ReadyNext_ list - for (auto event : PollerEvents_) { +void TContExecutor::Poll(TInstant deadline) { + Poller_.Wait(PollerEvents_, deadline); + LastPoll_ = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + for (auto event : PollerEvents_) { auto* lst = (NCoro::TPollEventList*)event.Data; const int status = event.Status; @@ -276,36 +276,36 @@ void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { Ready_.PushBack(cont); } -namespace { +namespace { inline TContExecutor*& ThisThreadExecutor() { struct TThisThreadExecutorHolder { - TContExecutor* Executor = nullptr; + TContExecutor* Executor = nullptr; }; return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor; } -} - +} + void TContExecutor::DeleteScheduled() noexcept { ToDelete_.ForEach([this](TCont* c) { Release(c); }); } -TCont* RunningCont() { +TCont* RunningCont() { TContExecutor* thisThreadExecutor = ThisThreadExecutor(); - return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; -} - + return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; +} + void TContExecutor::RunScheduler() noexcept { try { - TContExecutor* const prev = ThisThreadExecutor(); + TContExecutor* const prev = ThisThreadExecutor(); ThisThreadExecutor() = this; TCont* caller = Current_; TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; - Y_DEFER { - ThisThreadExecutor() = prev; - }; - + Y_DEFER { + ThisThreadExecutor() = prev; + }; + while (true) { if (ScheduleCallback_ && Current_) { ScheduleCallback_->OnUnschedule(*this); @@ -345,7 +345,7 @@ void TContExecutor::RunScheduler() noexcept { } } } catch (...) { - TBackTrace::FromCurrentException().PrintTo(Cerr); + TBackTrace::FromCurrentException().PrintTo(Cerr); Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); } } diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h index a055012f3f..283a96ecf1 100644 --- a/library/cpp/coroutine/engine/impl.h +++ b/library/cpp/coroutine/engine/impl.h @@ -121,9 +121,9 @@ private: bool Scheduled_ = false; }; -TCont* RunningCont(); +TCont* RunningCont(); + - template <class Functor> static void ContHelperFunc(TCont* cont, void* arg) { (*((Functor*)(arg)))(cont); @@ -134,15 +134,15 @@ static void ContHelperMemberFunc(TCont* c, void* arg) { ((reinterpret_cast<T*>(arg))->*M)(c); } -class IUserEvent - : public TIntrusiveListItem<IUserEvent> -{ -public: - virtual ~IUserEvent() = default; - - virtual void Execute() = 0; -}; - +class IUserEvent + : public TIntrusiveListItem<IUserEvent> +{ +public: + virtual ~IUserEvent() = default; + + virtual void Execute() = 0; +}; + /// Central coroutine class. /// Note, coroutines are single-threaded, and all methods must be called from the single thread class TContExecutor { @@ -249,22 +249,22 @@ public: return FailOnError_; } - void RegisterInWaitQueue(NCoro::TContPollEvent* event) { - WaitQueue_.Register(event); - } - + void RegisterInWaitQueue(NCoro::TContPollEvent* event) { + WaitQueue_.Register(event); + } + void ScheduleIoWait(TFdEvent* event) { - RegisterInWaitQueue(event); + RegisterInWaitQueue(event); Poller_.Schedule(event); } void ScheduleIoWait(TTimerEvent* event) noexcept { - RegisterInWaitQueue(event); + RegisterInWaitQueue(event); } - void ScheduleUserEvent(IUserEvent* event) { - UserEvents_.PushBack(event); - } + void ScheduleUserEvent(IUserEvent* event) { + UserEvents_.PushBack(event); + } void Pause(); TInstant Now(); @@ -285,7 +285,7 @@ private: void WaitForIO(); - void Poll(TInstant deadline); + void Poll(TInstant deadline); private: NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr; @@ -300,11 +300,11 @@ private: TContList ReadyNext_; NCoro::TEventWaitQueue WaitQueue_; NCoro::TContPoller Poller_; - NCoro::TContPoller::TEvents PollerEvents_; - TInstant LastPoll_; + NCoro::TContPoller::TEvents PollerEvents_; + TInstant LastPoll_; + + TIntrusiveList<IUserEvent> UserEvents_; - TIntrusiveList<IUserEvent> UserEvents_; - size_t Allocated_ = 0; TCont* Current_ = nullptr; bool FailOnError_ = false; diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp index 722ef1e3a6..61164fa56b 100644 --- a/library/cpp/coroutine/engine/poller.cpp +++ b/library/cpp/coroutine/engine/poller.cpp @@ -46,11 +46,11 @@ namespace { template <class T> class TVirtualize: public IPollerFace { public: - TVirtualize(EContPoller pollerEngine) - : PollerEngine_(pollerEngine) - { - } - + TVirtualize(EContPoller pollerEngine) + : PollerEngine_(pollerEngine) + { + } + void Set(const TChange& c) override { P_.Set(c); } @@ -59,12 +59,12 @@ namespace { P_.Wait(events, deadLine); } - EContPoller PollEngine() const override { - return PollerEngine_; - } + EContPoller PollEngine() const override { + return PollerEngine_; + } private: T P_; - const EContPoller PollerEngine_; + const EContPoller PollerEngine_; }; @@ -366,21 +366,21 @@ THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { switch (poller) { case EContPoller::Default: - case EContPoller::Combined: - return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); + case EContPoller::Combined: + return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); case EContPoller::Select: - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); case EContPoller::Poll: - return MakeHolder<TVirtualize<TPollPoller>>(poller); + return MakeHolder<TVirtualize<TPollPoller>>(poller); case EContPoller::Epoll: #if defined(HAVE_EPOLL_POLLER) - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); #else return nullptr; #endif case EContPoller::Kqueue: #if defined(HAVE_KQUEUE_POLLER) - return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); #else return nullptr; #endif diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h index ff60266d73..8ea012c0fc 100644 --- a/library/cpp/coroutine/engine/poller.h +++ b/library/cpp/coroutine/engine/poller.h @@ -8,7 +8,7 @@ enum class EContPoller { Default /* "default" */, - Combined /* "combined" */, + Combined /* "combined" */, Select /* "select" */, Poll /* "poll" */, Epoll /* "epoll" */, @@ -42,7 +42,7 @@ public: virtual void Set(const TChange& change) = 0; virtual void Wait(TEvents& events, TInstant deadLine) = 0; - virtual EContPoller PollEngine() const = 0; + virtual EContPoller PollEngine() const = 0; static THolder<IPollerFace> Default(); static THolder<IPollerFace> Construct(TStringBuf name); diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index 2afd5c709a..1ebb7e7b38 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -93,7 +93,7 @@ public: : Impl_(nullptr) { } - + TPooledSocket(TImpl* impl) : Impl_(impl) { @@ -178,7 +178,7 @@ public: } else { ret = AllocateMore(conn); } - + ret.Impl_->Touch(); return ret; @@ -189,10 +189,10 @@ public: alive->Touch(); socket = TPooledSocket(alive); return true; - } + } return false; } - + private: TPooledSocket::TImpl* GetImpl() { TGuard<TMutex> guard(Mutex_); diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp index 6857670e1e..10ea69ddc3 100644 --- a/library/cpp/coroutine/engine/trampoline.cpp +++ b/library/cpp/coroutine/engine/trampoline.cpp @@ -38,9 +38,9 @@ TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, return Stack_.Get(); } - const char* TTrampoline::ContName() const noexcept { - return Cont_->Name(); - } + const char* TTrampoline::ContName() const noexcept { + return Cont_->Name(); + } void TTrampoline::DoRunNaked() { DoRun(); diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h index 30cc079ab0..37b61cf015 100644 --- a/library/cpp/coroutine/engine/trampoline.h +++ b/library/cpp/coroutine/engine/trampoline.h @@ -39,8 +39,8 @@ namespace NCoro { } void SwitchTo(TExceptionSafeContext* ctx) noexcept { - Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); - Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); + Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); + Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); Ctx_.SwitchTo(ctx); } @@ -49,8 +49,8 @@ namespace NCoro { void DoRunNaked() override; private: - const char* ContName() const noexcept; - private: + const char* ContName() const noexcept; + private: NStack::TStackHolder Stack_; const TContClosure Clo_; TExceptionSafeContext Ctx_; diff --git a/library/cpp/http/io/compression.h b/library/cpp/http/io/compression.h index 30eccdaca5..f16c4a18eb 100644 --- a/library/cpp/http/io/compression.h +++ b/library/cpp/http/io/compression.h @@ -48,25 +48,25 @@ private: THashMap<TStringBuf, TCodec> Codecs_; TVector<TStringBuf> BestCodecs_; }; - -namespace NHttp { - template <typename F> - TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) { - if (available.empty()) { - return "identity"; - } - - if (accepted("*")) { - return TString(available[0]); - } - - for (const auto& coding : available) { - TString s(coding); - if (accepted(s)) { - return s; - } - } - - return "identity"; - } -} + +namespace NHttp { + template <typename F> + TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) { + if (available.empty()) { + return "identity"; + } + + if (accepted("*")) { + return TString(available[0]); + } + + for (const auto& coding : available) { + TString s(coding); + if (accepted(s)) { + return s; + } + } + + return "identity"; + } +} diff --git a/library/cpp/http/io/compression_ut.cpp b/library/cpp/http/io/compression_ut.cpp index d8a2d11a08..2f3d131f8c 100644 --- a/library/cpp/http/io/compression_ut.cpp +++ b/library/cpp/http/io/compression_ut.cpp @@ -5,7 +5,7 @@ #include <library/cpp/testing/unittest/tests_data.h> #include <util/stream/zlib.h> -#include <util/generic/hash_set.h> +#include <util/generic/hash_set.h> Y_UNIT_TEST_SUITE(THttpCompressionTest) { static const TString DATA = "I'm a teapot"; @@ -43,18 +43,18 @@ Y_UNIT_TEST_SUITE(THttpCompressionTest) { auto decodedStream = (*decoder)(&buffer); UNIT_ASSERT_EQUAL(decodedStream->ReadAll(), DATA); } - - Y_UNIT_TEST(TestChooseBestCompressionScheme) { - THashSet<TString> accepted; - - auto checkAccepted = [&accepted](const TString& v) { - return accepted.contains(v); - }; - - UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - accepted.insert("deflate"); - UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - accepted.insert("*"); - UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); - } + + Y_UNIT_TEST(TestChooseBestCompressionScheme) { + THashSet<TString> accepted; + + auto checkAccepted = [&accepted](const TString& v) { + return accepted.contains(v); + }; + + UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("deflate"); + UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + accepted.insert("*"); + UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"})); + } } // THttpCompressionTest suite diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp index 728d1a89c1..6689be684f 100644 --- a/library/cpp/http/io/stream.cpp +++ b/library/cpp/http/io/stream.cpp @@ -286,7 +286,7 @@ private: TParsedHeaders p; size_t pos = FirstLine_.rfind(' '); - // In HTTP/1.1 Keep-Alive is turned on by default + // In HTTP/1.1 Keep-Alive is turned on by default if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) { p.KeepAlive = true; //request } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) { @@ -428,12 +428,12 @@ bool THttpInput::AcceptEncoding(const TString& coding) const { } TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const { - return NHttp::ChooseBestCompressionScheme( - [this](const TString& coding) { - return AcceptEncoding(coding); - }, - codings - ); + return NHttp::ChooseBestCompressionScheme( + [this](const TString& coding) { + return AcceptEncoding(coding); + }, + codings + ); } TString THttpInput::BestCompressionScheme() const { diff --git a/library/cpp/http/io/stream_ut.cpp b/library/cpp/http/io/stream_ut.cpp index 1d78c82e0e..1ea35df675 100644 --- a/library/cpp/http/io/stream_ut.cpp +++ b/library/cpp/http/io/stream_ut.cpp @@ -179,63 +179,63 @@ Y_UNIT_TEST_SUITE(THttpStreamTest) { } Y_UNIT_TEST(TestKeepAlive) { - { + { TString s = "GET / HTTP/1.0\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.1\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "HTTP/1.0 200 Ok\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + + { TString s = "HTTP/1.0 200 Ok\r\nConnection: keep-alive\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "HTTP/1.1 200 Ok\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(in.IsKeepAlive()); - } - - { + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(in.IsKeepAlive()); + } + + { TString s = "HTTP/1.1 200 Ok\r\nConnection: close\r\n\r\n"; - TStringInput si(s); - THttpInput in(&si); - UNIT_ASSERT(!in.IsKeepAlive()); - } - } - + TStringInput si(s); + THttpInput in(&si); + UNIT_ASSERT(!in.IsKeepAlive()); + } + } + Y_UNIT_TEST(TestMinRequest) { TString res = "qqqqqq"; TPortManager pm; diff --git a/library/cpp/http/server/conn.cpp b/library/cpp/http/server/conn.cpp index 37f82f9f7b..38a76c4c30 100644 --- a/library/cpp/http/server/conn.cpp +++ b/library/cpp/http/server/conn.cpp @@ -5,11 +5,11 @@ class THttpServerConn::TImpl { public: - inline TImpl(const TSocket& s, size_t outputBufferSize) + inline TImpl(const TSocket& s, size_t outputBufferSize) : S_(s) , SI_(S_) , SO_(S_) - , BO_(&SO_, outputBufferSize) + , BO_(&SO_, outputBufferSize) , HI_(&SI_) , HO_(&BO_, &HI_) { @@ -44,15 +44,15 @@ private: }; THttpServerConn::THttpServerConn(const TSocket& s) - : THttpServerConn(s, s.MaximumTransferUnit()) + : THttpServerConn(s, s.MaximumTransferUnit()) +{ +} + +THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize) + : Impl_(new TImpl(s, outputBufferSize)) { } -THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize) - : Impl_(new TImpl(s, outputBufferSize)) -{ -} - THttpServerConn::~THttpServerConn() { } diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp index 3d21108b02..128583bdd7 100644 --- a/library/cpp/http/server/http.cpp +++ b/library/cpp/http/server/http.cpp @@ -1,5 +1,5 @@ #include "http.h" -#include "http_ex.h" +#include "http_ex.h" #include <library/cpp/threading/equeue/equeue.h> @@ -243,17 +243,17 @@ public: } void AddRequest(TAutoPtr<TClientRequest> req, bool fail) { - struct TFailRequest: public THttpClientRequestEx { + struct TFailRequest: public THttpClientRequestEx { inline TFailRequest(TAutoPtr<TClientRequest> parent) { Conn_.Reset(parent->Conn_.Release()); HttpConn_.Reset(parent->HttpConn_.Release()); } bool Reply(void*) override { - if (!ProcessHeaders()) { + if (!ProcessHeaders()) { return true; - } - + } + ProcessFailRequest(0); return true; } @@ -558,11 +558,11 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv, { SetNoDelay(Socket_, true); - const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; - if (clientTimeout != TDuration::Zero()) { + const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout; + if (clientTimeout != TDuration::Zero()) { SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond()); - } - + } + HttpServ_->IncreaseConnections(); } @@ -679,16 +679,16 @@ void TClientRequest::ResetConnection() { void TClientRequest::Process(void* ThreadSpecificResource) { THolder<TClientRequest> this_(this); - auto* serverImpl = Conn_->HttpServ_; - + auto* serverImpl = Conn_->HttpServ_; + try { if (!HttpConn_) { - const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; - if (outputBufferSize) { - HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); - } else { - HttpConn_.Reset(new THttpServerConn(Socket())); - } + const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize; + if (outputBufferSize) { + HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize)); + } else { + HttpConn_.Reset(new THttpServerConn(Socket())); + } auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection; HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection)); @@ -715,7 +715,7 @@ void TClientRequest::Process(void* ThreadSpecificResource) { return; } } catch (...) { - serverImpl->Cb_->OnException(); + serverImpl->Cb_->OnException(); throw; } diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index 42580906f6..cc62bb988e 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -456,31 +456,31 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { server.Stop(); } - class TReleaseConnectionServer: public THttpServer::ICallBack { - class TRequest: public THttpClientRequestEx { - public: - bool Reply(void* /*tsr*/) override { - Output() << "HTTP/1.1 200 Ok\r\n\r\n"; - Output() << "reply"; - Output().Finish(); - - ReleaseConnection(); - - throw yexception() << "some error"; - - return true; - } - }; - - public: - TClientRequest* CreateClient() override { - return new TRequest(); - } - - void OnException() override { - ExceptionMessage = CurrentExceptionMessage(); - } - + class TReleaseConnectionServer: public THttpServer::ICallBack { + class TRequest: public THttpClientRequestEx { + public: + bool Reply(void* /*tsr*/) override { + Output() << "HTTP/1.1 200 Ok\r\n\r\n"; + Output() << "reply"; + Output().Finish(); + + ReleaseConnection(); + + throw yexception() << "some error"; + + return true; + } + }; + + public: + TClientRequest* CreateClient() override { + return new TRequest(); + } + + void OnException() override { + ExceptionMessage = CurrentExceptionMessage(); + } + TString ExceptionMessage; }; @@ -495,7 +495,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } }; - public: + public: TClientRequest* CreateClient() override { return new TRequest(); } @@ -504,9 +504,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { ExceptionMessage = CurrentExceptionMessage(); } - TString ExceptionMessage; - }; - + TString ExceptionMessage; + }; + class TListenerSockAddrReplyServer: public THttpServer::ICallBack { class TRequest: public TClientRequest { public: @@ -542,22 +542,22 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { }; Y_UNIT_TEST(TTestReleaseConnection) { - TPortManager pm; - const ui16 port = pm.GetPort(); - - TReleaseConnectionServer serverImpl; - THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true)); - UNIT_ASSERT(server.Start()); - - TTestRequest r(port, "request"); - r.KeepAliveConnection = true; - - UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription()); - - server.Stop(); - - UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error"); - }; + TPortManager pm; + const ui16 port = pm.GetPort(); + + TReleaseConnectionServer serverImpl; + THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true)); + UNIT_ASSERT(server.Start()); + + TTestRequest r(port, "request"); + r.KeepAliveConnection = true; + + UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription()); + + server.Stop(); + + UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error"); + }; THttpInput SendRequest(TSocket& socket, ui16 port) { TSocketInput si(socket); diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h index 602649e430..38eda0e5e7 100644 --- a/library/cpp/http/server/options.h +++ b/library/cpp/http/server/options.h @@ -6,7 +6,7 @@ #include <util/generic/size_literals.h> #include <util/generic/string.h> #include <util/generic/vector.h> -#include <util/datetime/base.h> +#include <util/datetime/base.h> class THttpServerOptions { public: @@ -93,10 +93,10 @@ public: return *this; } - + inline THttpServerOptions& SetClientTimeout(const TDuration& timeout) noexcept { ClientTimeout = timeout; - + return *this; } @@ -107,11 +107,11 @@ public: } inline THttpServerOptions& SetOutputBufferSize(size_t val) noexcept { - OutputBufferSize = val; - - return *this; - } - + OutputBufferSize = val; + + return *this; + } + inline THttpServerOptions& SetMaxInputContentLength(ui64 val) noexcept { MaxInputContentLength = val; @@ -162,7 +162,7 @@ public: ui32 MaxConnections = 100; int ListenBacklog = SOMAXCONN; TDuration ClientTimeout; - size_t OutputBufferSize = 0; + size_t OutputBufferSize = 0; ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB; size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed bool UseElasticQueues = false; diff --git a/library/cpp/http/server/response.h b/library/cpp/http/server/response.h index eed4afc7b6..a75cb85605 100644 --- a/library/cpp/http/server/response.h +++ b/library/cpp/http/server/response.h @@ -34,10 +34,10 @@ public: THttpResponse& AddMultipleHeaders(const THttpHeaders& headers); - const THttpHeaders& GetHeaders() const { - return Headers; - } - + const THttpHeaders& GetHeaders() const { + return Headers; + } + THttpResponse& SetContentType(const TStringBuf& contentType); /** diff --git a/library/cpp/http/server/response_ut.cpp b/library/cpp/http/server/response_ut.cpp index 8a142fb1ba..73e2112ad3 100644 --- a/library/cpp/http/server/response_ut.cpp +++ b/library/cpp/http/server/response_ut.cpp @@ -46,25 +46,25 @@ Y_UNIT_TEST_SUITE(TestHttpResponse) { EXPECTED); } - Y_UNIT_TEST(TestGetHeaders) { - THttpResponse resp(HTTP_FORBIDDEN); - - THttpHeaders headers; - headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); - headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); - headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); - resp.AddMultipleHeaders(headers); - resp.AddHeader("X-Header-4", "ValueFour"); - - const THttpHeaders& gotHeaders = resp.GetHeaders(); - UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4); - UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1")); - UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne"); - UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4")); - UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour"); - } - - + Y_UNIT_TEST(TestGetHeaders) { + THttpResponse resp(HTTP_FORBIDDEN); + + THttpHeaders headers; + headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne")); + headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo")); + headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree")); + resp.AddMultipleHeaders(headers); + resp.AddHeader("X-Header-4", "ValueFour"); + + const THttpHeaders& gotHeaders = resp.GetHeaders(); + UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne"); + UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4")); + UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour"); + } + + Y_UNIT_TEST(TestSetContent) { const char* EXPECTED = "HTTP/1.1 200 Ok\r\n" "Content-Length: 10\r\n" diff --git a/library/cpp/streams/brotli/brotli.cpp b/library/cpp/streams/brotli/brotli.cpp index a5536eff99..38052cb688 100644 --- a/library/cpp/streams/brotli/brotli.cpp +++ b/library/cpp/streams/brotli/brotli.cpp @@ -139,7 +139,7 @@ public: ui8* outBuffer = static_cast<ui8*>(buffer); size_t availableOut = size; - size_t decompressedSize = 0; + size_t decompressedSize = 0; BrotliDecoderResult result; do { @@ -163,7 +163,7 @@ public: &outBuffer, nullptr); - decompressedSize = size - availableOut; + decompressedSize = size - availableOut; SubstreamFinished_ = (result == BROTLI_DECODER_RESULT_SUCCESS); if (result == BROTLI_DECODER_RESULT_ERROR) { @@ -175,7 +175,7 @@ public: "Buffer passed to read in Brotli decoder is too small"); break; } - } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_); + } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_); if (!SubstreamFinished_ && decompressedSize == 0) { ythrow yexception() << "Input stream is incomplete"; diff --git a/library/cpp/streams/brotli/brotli_ut.cpp b/library/cpp/streams/brotli/brotli_ut.cpp index fb4372c4a5..aeb2e284dc 100644 --- a/library/cpp/streams/brotli/brotli_ut.cpp +++ b/library/cpp/streams/brotli/brotli_ut.cpp @@ -41,23 +41,23 @@ Y_UNIT_TEST_SUITE(TBrotliTestSuite) { TestCase("hello world"); } - Y_UNIT_TEST(TestFlush) { - TStringStream ss; - TBrotliCompress compressStream(&ss); - TBrotliDecompress decompressStream(&ss); - - for (size_t i = 0; i < 3; ++i) { - TString s = GenerateRandomString(1 << 15); - compressStream.Write(s.data(), s.size()); - compressStream.Flush(); - - TString r(s.size(), '*'); - decompressStream.Load((char*)r.data(), r.size()); - - UNIT_ASSERT_VALUES_EQUAL(s, r); - } - } - + Y_UNIT_TEST(TestFlush) { + TStringStream ss; + TBrotliCompress compressStream(&ss); + TBrotliDecompress decompressStream(&ss); + + for (size_t i = 0; i < 3; ++i) { + TString s = GenerateRandomString(1 << 15); + compressStream.Write(s.data(), s.size()); + compressStream.Flush(); + + TString r(s.size(), '*'); + decompressStream.Load((char*)r.data(), r.size()); + + UNIT_ASSERT_VALUES_EQUAL(s, r); + } + } + Y_UNIT_TEST(TestSeveralStreams) { auto s1 = GenerateRandomString(1 << 15); auto s2 = GenerateRandomString(1 << 15); diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp index aaec19daa6..54a848e912 100644 --- a/library/cpp/threading/equeue/equeue.cpp +++ b/library/cpp/threading/equeue/equeue.cpp @@ -1,80 +1,80 @@ -#include "equeue.h" - -TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue) - : SlaveQueue_(std::move(slaveQueue)) -{ -} - -size_t TElasticQueue::ObjectCount() const { - return (size_t)AtomicGet(ObjectCount_); -} - -bool TElasticQueue::TryIncCounter() { - if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) { - AtomicDecrement(GuardCount_); - return false; - } - - return true; -} - - - -class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue { -public: - TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue) - : RealObject_(realObject) - , Queue_(queue) - { - AtomicIncrement(Queue_->ObjectCount_); - } - +#include "equeue.h" + +TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue) + : SlaveQueue_(std::move(slaveQueue)) +{ +} + +size_t TElasticQueue::ObjectCount() const { + return (size_t)AtomicGet(ObjectCount_); +} + +bool TElasticQueue::TryIncCounter() { + if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) { + AtomicDecrement(GuardCount_); + return false; + } + + return true; +} + + + +class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue { +public: + TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue) + : RealObject_(realObject) + , Queue_(queue) + { + AtomicIncrement(Queue_->ObjectCount_); + } + ~TDecrementingWrapper() override { - AtomicDecrement(Queue_->ObjectCount_); - AtomicDecrement(Queue_->GuardCount_); - } -private: - void Process(void *tsr) override { - THolder<TDecrementingWrapper> self(this); - RealObject_->Process(tsr); - } -private: - IObjectInQueue* const RealObject_; - TElasticQueue* const Queue_; -}; - - - -bool TElasticQueue::Add(IObjectInQueue* obj) { - if (!TryIncCounter()) { - return false; - } - - THolder<TDecrementingWrapper> wrapper; - try { - wrapper.Reset(new TDecrementingWrapper(obj, this)); - } catch (...) { - AtomicDecrement(GuardCount_); - throw; - } - - if (SlaveQueue_->Add(wrapper.Get())) { + AtomicDecrement(Queue_->ObjectCount_); + AtomicDecrement(Queue_->GuardCount_); + } +private: + void Process(void *tsr) override { + THolder<TDecrementingWrapper> self(this); + RealObject_->Process(tsr); + } +private: + IObjectInQueue* const RealObject_; + TElasticQueue* const Queue_; +}; + + + +bool TElasticQueue::Add(IObjectInQueue* obj) { + if (!TryIncCounter()) { + return false; + } + + THolder<TDecrementingWrapper> wrapper; + try { + wrapper.Reset(new TDecrementingWrapper(obj, this)); + } catch (...) { + AtomicDecrement(GuardCount_); + throw; + } + + if (SlaveQueue_->Add(wrapper.Get())) { Y_UNUSED(wrapper.Release()); - return true; - } else { - return false; - } -} - -void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) { - MaxQueueSize_ = maxQueueSize; - SlaveQueue_->Start(threadCount, maxQueueSize); -} - + return true; + } else { + return false; + } +} + +void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) { + MaxQueueSize_ = maxQueueSize; + SlaveQueue_->Start(threadCount, maxQueueSize); +} + void TElasticQueue::Stop() noexcept { - return SlaveQueue_->Stop(); -} - + return SlaveQueue_->Stop(); +} + size_t TElasticQueue::Size() const noexcept { - return SlaveQueue_->Size(); -} + return SlaveQueue_->Size(); +} diff --git a/library/cpp/threading/equeue/equeue.h b/library/cpp/threading/equeue/equeue.h index 403e993713..40dd342585 100644 --- a/library/cpp/threading/equeue/equeue.h +++ b/library/cpp/threading/equeue/equeue.h @@ -1,28 +1,28 @@ -#pragma once - +#pragma once + #include <util/thread/pool.h> -#include <util/system/atomic.h> -#include <util/generic/ptr.h> - -//actual queue limit will be (maxQueueSize - numBusyThreads) or 0 +#include <util/system/atomic.h> +#include <util/generic/ptr.h> + +//actual queue limit will be (maxQueueSize - numBusyThreads) or 0 class TElasticQueue: public IThreadPool { -public: - explicit TElasticQueue(THolder<IThreadPool> slaveQueue); - - bool Add(IObjectInQueue* obj) override; - size_t Size() const noexcept override; - - void Start(size_t threadCount, size_t maxQueueSize) override; - void Stop() noexcept override; - - size_t ObjectCount() const; -private: - class TDecrementingWrapper; - - bool TryIncCounter(); -private: +public: + explicit TElasticQueue(THolder<IThreadPool> slaveQueue); + + bool Add(IObjectInQueue* obj) override; + size_t Size() const noexcept override; + + void Start(size_t threadCount, size_t maxQueueSize) override; + void Stop() noexcept override; + + size_t ObjectCount() const; +private: + class TDecrementingWrapper; + + bool TryIncCounter(); +private: THolder<IThreadPool> SlaveQueue_; - size_t MaxQueueSize_ = 0; - TAtomic ObjectCount_ = 0; - TAtomic GuardCount_ = 0; -}; + size_t MaxQueueSize_ = 0; + TAtomic ObjectCount_ = 0; + TAtomic GuardCount_ = 0; +}; diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index defa1a0e82..9cf2aced44 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,125 +1,125 @@ -#include "equeue.h" - +#include "equeue.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/system/event.h> -#include <util/datetime/base.h> -#include <util/generic/vector.h> - + +#include <util/system/event.h> +#include <util/datetime/base.h> +#include <util/generic/vector.h> + Y_UNIT_TEST_SUITE(TElasticQueueTest) { - const size_t MaxQueueSize = 20; - const size_t ThreadCount = 10; - const size_t N = 100000; - - static THolder<TElasticQueue> Queue; - - struct TQueueSetup { - TQueueSetup() { + const size_t MaxQueueSize = 20; + const size_t ThreadCount = 10; + const size_t N = 100000; + + static THolder<TElasticQueue> Queue; + + struct TQueueSetup { + TQueueSetup() { Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } - }; - - struct TCounters { - void Reset() { - Processed = Scheduled = Discarded = Total = 0; - } - - TAtomic Processed; - TAtomic Scheduled; - TAtomic Discarded; - TAtomic Total; - }; - static TCounters Counters; - -//fill test -- fill queue with "endless" jobs + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } + }; + + struct TCounters { + void Reset() { + Processed = Scheduled = Discarded = Total = 0; + } + + TAtomic Processed; + TAtomic Scheduled; + TAtomic Discarded; + TAtomic Total; + }; + static TCounters Counters; + +//fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; Y_UNIT_TEST(FillTest) { - Counters.Reset(); - - struct TWaitJob: public IObjectInQueue { + Counters.Reset(); + + struct TWaitJob: public IObjectInQueue { void Process(void*) override { - WaitEvent.Wait(); - AtomicIncrement(Counters.Processed); - } - } job; - - struct TLocalSetup: TQueueSetup { - ~TLocalSetup() { - WaitEvent.Signal(); - } - }; - - size_t enqueued = 0; - { - TLocalSetup setup; - while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { - ++enqueued; - } - - UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); - } - - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); - } - - -//concurrent test -- send many jobs from different threads - struct TJob: public IObjectInQueue { + WaitEvent.Wait(); + AtomicIncrement(Counters.Processed); + } + } job; + + struct TLocalSetup: TQueueSetup { + ~TLocalSetup() { + WaitEvent.Signal(); + } + }; + + size_t enqueued = 0; + { + TLocalSetup setup; + while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + ++enqueued; + } + + UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); + UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); + } + + +//concurrent test -- send many jobs from different threads + struct TJob: public IObjectInQueue { void Process(void*) override { - AtomicIncrement(Counters.Processed); - }; - }; - static TJob Job; - - static bool TryAdd() { - AtomicIncrement(Counters.Total); - if (Queue->Add(&Job)) { - AtomicIncrement(Counters.Scheduled); - return true; - } else { - AtomicIncrement(Counters.Discarded); - return false; - } - } - - static size_t TryCounter; - + AtomicIncrement(Counters.Processed); + }; + }; + static TJob Job; + + static bool TryAdd() { + AtomicIncrement(Counters.Total); + if (Queue->Add(&Job)) { + AtomicIncrement(Counters.Scheduled); + return true; + } else { + AtomicIncrement(Counters.Discarded); + return false; + } + } + + static size_t TryCounter; + Y_UNIT_TEST(ConcurrentTest) { - Counters.Reset(); - TryCounter = 0; - + Counters.Reset(); + TryCounter = 0; + struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { - while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd()) { - Sleep(TDuration::MicroSeconds(50)); - } - } - } - } sender; - - { - TQueueSetup setup; - + while ((size_t)AtomicIncrement(TryCounter) <= N) { + if (!TryAdd()) { + Sleep(TDuration::MicroSeconds(50)); + } + } + } + } sender; + + { + TQueueSetup setup; + TVector< TAutoPtr<IThreadFactory::IThread> > senders; - for (size_t i = 0; i < ThreadCount; ++i) { + for (size_t i = 0; i < ThreadCount; ++i) { senders.push_back(::SystemThreadFactory()->Run(&sender)); - } - - for (size_t i = 0; i < senders.size(); ++i) { - senders[i]->Join(); - } - } - - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); - UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); - UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); - } -} + } + + for (size_t i = 0; i < senders.size(); ++i) { + senders[i]->Join(); + } + } + + UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); + UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); + UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); + } +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp index 407761eea7..174dde4bf7 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp @@ -1,246 +1,246 @@ #include "task_scheduler.h" - -#include <util/system/thread.h> -#include <util/string/cast.h> + +#include <util/system/thread.h> +#include <util/string/cast.h> #include <util/stream/output.h> - -TTaskScheduler::ITask::~ITask() {} -TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} - - - -class TTaskScheduler::TWorkerThread + +TTaskScheduler::ITask::~ITask() {} +TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} + + + +class TTaskScheduler::TWorkerThread : public ISimpleThread -{ -public: - TWorkerThread(TTaskScheduler& state) - : Scheduler_(state) - { - } - +{ +public: + TWorkerThread(TTaskScheduler& state) + : Scheduler_(state) + { + } + TString DebugState = "?"; TString DebugId = ""; -private: +private: void* ThreadProc() noexcept override { - Scheduler_.WorkerFunc(this); - return nullptr; - } -private: - TTaskScheduler& Scheduler_; -}; - - - -TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount) - : MaxTaskCount_(maxTaskCount) -{ - for (size_t i = 0; i < threadCount; ++i) { - Workers_.push_back(new TWorkerThread(*this)); - Workers_.back()->DebugId = ToString(i); - } -} - -TTaskScheduler::~TTaskScheduler() { - try { - Stop(); - } catch (...) { + Scheduler_.WorkerFunc(this); + return nullptr; + } +private: + TTaskScheduler& Scheduler_; +}; + + + +TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount) + : MaxTaskCount_(maxTaskCount) +{ + for (size_t i = 0; i < threadCount; ++i) { + Workers_.push_back(new TWorkerThread(*this)); + Workers_.back()->DebugId = ToString(i); + } +} + +TTaskScheduler::~TTaskScheduler() { + try { + Stop(); + } catch (...) { Cdbg << "task scheduled destruction error: " << CurrentExceptionMessage(); - } -} - -void TTaskScheduler::Start() { - for (auto& w : Workers_) { - w->Start(); - } -} - -void TTaskScheduler::Stop() { - with_lock (Lock_) { - IsStopped_ = true; - CondVar_.BroadCast(); - } - - for (auto& w: Workers_) { - w->Join(); - } - - Workers_.clear(); - Queue_.clear(); -} - -size_t TTaskScheduler::GetTaskCount() const { - return static_cast<size_t>(AtomicGet(TaskCounter_)); -} - -namespace { - class TTaskWrapper - : public TTaskScheduler::ITask - , TNonCopyable - { - public: - TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter) - : Task_(task) - , Counter_(counter) - { - AtomicIncrement(Counter_); - } - + } +} + +void TTaskScheduler::Start() { + for (auto& w : Workers_) { + w->Start(); + } +} + +void TTaskScheduler::Stop() { + with_lock (Lock_) { + IsStopped_ = true; + CondVar_.BroadCast(); + } + + for (auto& w: Workers_) { + w->Join(); + } + + Workers_.clear(); + Queue_.clear(); +} + +size_t TTaskScheduler::GetTaskCount() const { + return static_cast<size_t>(AtomicGet(TaskCounter_)); +} + +namespace { + class TTaskWrapper + : public TTaskScheduler::ITask + , TNonCopyable + { + public: + TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter) + : Task_(task) + , Counter_(counter) + { + AtomicIncrement(Counter_); + } + ~TTaskWrapper() override { - AtomicDecrement(Counter_); - } - private: - TInstant Process() override { - return Task_->Process(); - } - private: - TTaskScheduler::ITaskRef Task_; - TAtomic& Counter_; - }; -} - -bool TTaskScheduler::Add(ITaskRef task, TInstant expire) { - with_lock (Lock_) { - if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) { - ITaskRef newTask = new TTaskWrapper(task, TaskCounter_); + AtomicDecrement(Counter_); + } + private: + TInstant Process() override { + return Task_->Process(); + } + private: + TTaskScheduler::ITaskRef Task_; + TAtomic& Counter_; + }; +} + +bool TTaskScheduler::Add(ITaskRef task, TInstant expire) { + with_lock (Lock_) { + if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) { + ITaskRef newTask = new TTaskWrapper(task, TaskCounter_); Queue_.insert(std::make_pair(expire, TTaskHolder(newTask))); - - if (!Queue_.begin()->second.WaitingWorker) { - CondVar_.Signal(); - } - return true; - } - } - - return false; -} - -namespace { - class TRepeatedTask - : public TTaskScheduler::ITask - { - public: - TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline) - : Task_(task) - , Period_(period) - , Deadline_(deadline) - { - } - private: - TInstant Process() final { - Deadline_ += Period_; - if (Task_->Process()) { - return Deadline_; - } else { - return TInstant::Max(); - } - } - private: - TTaskScheduler::IRepeatedTaskRef Task_; - TDuration Period_; - TInstant Deadline_; - }; -} - -bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { - const TInstant deadline = Now() + period; - ITaskRef t = new TRepeatedTask(task, period, deadline); - return Add(t, deadline); -} - - -const bool debugOutput = false; - + + if (!Queue_.begin()->second.WaitingWorker) { + CondVar_.Signal(); + } + return true; + } + } + + return false; +} + +namespace { + class TRepeatedTask + : public TTaskScheduler::ITask + { + public: + TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline) + : Task_(task) + , Period_(period) + , Deadline_(deadline) + { + } + private: + TInstant Process() final { + Deadline_ += Period_; + if (Task_->Process()) { + return Deadline_; + } else { + return TInstant::Max(); + } + } + private: + TTaskScheduler::IRepeatedTaskRef Task_; + TDuration Period_; + TInstant Deadline_; + }; +} + +bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { + const TInstant deadline = Now() + period; + ITaskRef t = new TRepeatedTask(task, period, deadline); + return Add(t, deadline); +} + + +const bool debugOutput = false; + void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) { - if (!debugOutput) { + if (!debugOutput) { Y_UNUSED(thread); Y_UNUSED(state); - return; - } - - thread->DebugState = state; - - TStringStream ss; - ss << Now() << " " << thread->DebugId << ":\t"; - for (auto& w : Workers_) { - ss << w->DebugState << " "; - } - ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl; + return; + } + + thread->DebugState = state; + + TStringStream ss; + ss << Now() << " " << thread->DebugId << ":\t"; + for (auto& w : Workers_) { + ss << w->DebugState << " "; + } + ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl; Cerr << ss.Str(); -} - -bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) { - ChangeDebugState(thread, "w"); - toWait->second.WaitingWorker = thread; - return !CondVar_.WaitD(Lock_, toWait->first); -} - -void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) { - for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) { - if (!it->second.WaitingWorker) { - if (toWait == Queue_.end()) { - toWait = it; - } else if (it->first < toWait->first) { - toWait->second.WaitingWorker = nullptr; - toWait = it; - } - break; - } - } -} - -void TTaskScheduler::WorkerFunc(TWorkerThread* thread) { +} + +bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) { + ChangeDebugState(thread, "w"); + toWait->second.WaitingWorker = thread; + return !CondVar_.WaitD(Lock_, toWait->first); +} + +void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) { + for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) { + if (!it->second.WaitingWorker) { + if (toWait == Queue_.end()) { + toWait = it; + } else if (it->first < toWait->first) { + toWait->second.WaitingWorker = nullptr; + toWait = it; + } + break; + } + } +} + +void TTaskScheduler::WorkerFunc(TWorkerThread* thread) { TThread::SetCurrentThreadName("TaskSchedWorker"); - TQueueIterator toWait = Queue_.end(); - ITaskRef toDo; - - for (;;) { - TInstant repeat = TInstant::Max(); - - if (!!toDo) { - try { - repeat = toDo->Process(); - } catch (...) { + TQueueIterator toWait = Queue_.end(); + ITaskRef toDo; + + for (;;) { + TInstant repeat = TInstant::Max(); + + if (!!toDo) { + try { + repeat = toDo->Process(); + } catch (...) { Cdbg << "task scheduler error: " << CurrentExceptionMessage(); - } - } - - - with_lock (Lock_) { - ChangeDebugState(thread, "f"); - - if (IsStopped_) { - ChangeDebugState(thread, "s"); - return ; - } - - if (!!toDo) { - if (repeat < TInstant::Max()) { + } + } + + + with_lock (Lock_) { + ChangeDebugState(thread, "f"); + + if (IsStopped_) { + ChangeDebugState(thread, "s"); + return ; + } + + if (!!toDo) { + if (repeat < TInstant::Max()) { Queue_.insert(std::make_pair(repeat, TTaskHolder(toDo))); - } - } - - toDo = nullptr; - - ChooseFromQueue(toWait); - - if (toWait != Queue_.end()) { - if (toWait->first <= Now() || Wait(thread, toWait)) { - - toDo = toWait->second.Task; - Queue_.erase(toWait); - toWait = Queue_.end(); - - if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) { - CondVar_.Signal(); - } - - ChangeDebugState(thread, "p"); - } - } else { - ChangeDebugState(thread, "e"); - CondVar_.WaitI(Lock_); - } - } - } -} + } + } + + toDo = nullptr; + + ChooseFromQueue(toWait); + + if (toWait != Queue_.end()) { + if (toWait->first <= Now() || Wait(thread, toWait)) { + + toDo = toWait->second.Task; + Queue_.erase(toWait); + toWait = Queue_.end(); + + if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) { + CondVar_.Signal(); + } + + ChangeDebugState(thread, "p"); + } + } else { + ChangeDebugState(thread, "e"); + CondVar_.WaitI(Lock_); + } + } + } +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h index 166946a2aa..df4da941a8 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.h +++ b/library/cpp/threading/task_scheduler/task_scheduler.h @@ -1,86 +1,86 @@ -#pragma once - -#include <util/generic/vector.h> -#include <util/generic/ptr.h> -#include <util/generic/map.h> - -#include <util/datetime/base.h> - -#include <util/system/condvar.h> -#include <util/system/mutex.h> - -class TTaskScheduler { -public: - class ITask; - using ITaskRef = TIntrusivePtr<ITask>; - - class IRepeatedTask; - using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; -public: - explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); - ~TTaskScheduler(); - - void Start(); - void Stop(); - - bool Add(ITaskRef task, TInstant expire); - bool Add(IRepeatedTaskRef task, TDuration period); - - size_t GetTaskCount() const; -private: - class TWorkerThread; - - struct TTaskHolder { - explicit TTaskHolder(ITaskRef& task) - : Task(task) - { - } - public: - ITaskRef Task; - TWorkerThread* WaitingWorker = nullptr; - }; - +#pragma once + +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/generic/map.h> + +#include <util/datetime/base.h> + +#include <util/system/condvar.h> +#include <util/system/mutex.h> + +class TTaskScheduler { +public: + class ITask; + using ITaskRef = TIntrusivePtr<ITask>; + + class IRepeatedTask; + using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; +public: + explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); + ~TTaskScheduler(); + + void Start(); + void Stop(); + + bool Add(ITaskRef task, TInstant expire); + bool Add(IRepeatedTaskRef task, TDuration period); + + size_t GetTaskCount() const; +private: + class TWorkerThread; + + struct TTaskHolder { + explicit TTaskHolder(ITaskRef& task) + : Task(task) + { + } + public: + ITaskRef Task; + TWorkerThread* WaitingWorker = nullptr; + }; + using TQueueType = TMultiMap<TInstant, TTaskHolder>; using TQueueIterator = TQueueType::iterator; -private: +private: void ChangeDebugState(TWorkerThread* thread, const TString& state); - void ChooseFromQueue(TQueueIterator& toWait); - bool Wait(TWorkerThread* thread, TQueueIterator& toWait); - - void WorkerFunc(TWorkerThread* thread); -private: - bool IsStopped_ = false; - - TAtomic TaskCounter_ = 0; + void ChooseFromQueue(TQueueIterator& toWait); + bool Wait(TWorkerThread* thread, TQueueIterator& toWait); + + void WorkerFunc(TWorkerThread* thread); +private: + bool IsStopped_ = false; + + TAtomic TaskCounter_ = 0; TQueueType Queue_; - - TCondVar CondVar_; - TMutex Lock_; - + + TCondVar CondVar_; + TMutex Lock_; + TVector<TAutoPtr<TWorkerThread>> Workers_; - - const size_t MaxTaskCount_; -}; - -class TTaskScheduler::ITask - : public TAtomicRefCount<ITask> -{ -public: - virtual ~ITask(); - - virtual TInstant Process() {//returns time to repeat this task - return TInstant::Max(); - } -}; - -class TTaskScheduler::IRepeatedTask - : public TAtomicRefCount<IRepeatedTask> -{ -public: - virtual ~IRepeatedTask(); - - virtual bool Process() {//returns if to repeat task again - return false; - } -}; - + + const size_t MaxTaskCount_; +}; + +class TTaskScheduler::ITask + : public TAtomicRefCount<ITask> +{ +public: + virtual ~ITask(); + + virtual TInstant Process() {//returns time to repeat this task + return TInstant::Max(); + } +}; + +class TTaskScheduler::IRepeatedTask + : public TAtomicRefCount<IRepeatedTask> +{ +public: + virtual ~IRepeatedTask(); + + virtual bool Process() {//returns if to repeat task again + return false; + } +}; + diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index cb2daa718b..3b5203194a 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -2,7 +2,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/stream/output.h> -#include <util/system/atomic.h> +#include <util/system/atomic.h> #include <util/generic/vector.h> #include "task_scheduler.h" @@ -12,13 +12,13 @@ class TTaskSchedulerTest: public TTestBase { UNIT_TEST(Test); UNIT_TEST_SUITE_END(); - class TCheckTask: public TTaskScheduler::IRepeatedTask { + class TCheckTask: public TTaskScheduler::IRepeatedTask { public: TCheckTask(const TDuration& delay) : Start_(Now()) , Delay_(delay) { - AtomicIncrement(ScheduledTaskCounter_); + AtomicIncrement(ScheduledTaskCounter_); } ~TCheckTask() override { @@ -28,28 +28,28 @@ class TTaskSchedulerTest: public TTestBase { const TDuration delay = Now() - Start_; if (delay < Delay_) { - AtomicIncrement(BadTimeoutCounter_); + AtomicIncrement(BadTimeoutCounter_); } - AtomicIncrement(ExecutedTaskCounter_); + AtomicIncrement(ExecutedTaskCounter_); return false; } static bool AllTaskExecuted() { - return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); + return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); } static size_t BadTimeoutCount() { - return AtomicGet(BadTimeoutCounter_); + return AtomicGet(BadTimeoutCounter_); } private: TInstant Start_; TDuration Delay_; - static TAtomic BadTimeoutCounter_; - static TAtomic ScheduledTaskCounter_; - static TAtomic ExecutedTaskCounter_; + static TAtomic BadTimeoutCounter_; + static TAtomic ScheduledTaskCounter_; + static TAtomic ExecutedTaskCounter_; }; public: @@ -72,15 +72,15 @@ class TTaskSchedulerTest: public TTestBase { void ScheduleCheckTask(size_t delay) { TDuration d = TDuration::MicroSeconds(delay); - Scheduler_.Add(new TCheckTask(d), d); + Scheduler_.Add(new TCheckTask(d), d); } private: - TTaskScheduler Scheduler_; + TTaskScheduler Scheduler_; }; -TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0; +TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0; UNIT_TEST_SUITE_REGISTRATION(TTaskSchedulerTest); diff --git a/tools/enum_parser/enum_parser/main.cpp b/tools/enum_parser/enum_parser/main.cpp index b86cd55428..0943c69c1d 100644 --- a/tools/enum_parser/enum_parser/main.cpp +++ b/tools/enum_parser/enum_parser/main.cpp @@ -32,7 +32,7 @@ void WriteHeader(const TString& headerName, IOutputStream& out, IOutputStream* h out << "#include <util/generic/typetraits.h>\n"; out << "#include <util/generic/singleton.h>\n"; out << "#include <util/generic/string.h>\n"; - out << "#include <util/generic/vector.h>\n"; + out << "#include <util/generic/vector.h>\n"; out << "#include <util/generic/map.h>\n"; out << "#include <util/generic/serialized_enum.h>\n"; out << "#include <util/string/cast.h>\n"; @@ -331,7 +331,7 @@ void GenerateEnum( out << " const " << nsName << "::TNameBufs& names = " << nsName << "::TNameBufs::Instance();\n"; out << " return names.AllEnumValues();\n"; out << " }\n\n"; - + // template<> GetEnumAllNames out << " template<>\n"; out << " const TString& GetEnumAllNamesImpl<" << name << ">() {\n"; diff --git a/util/network/address.cpp b/util/network/address.cpp index 267c2f9092..a81a9e6994 100644 --- a/util/network/address.cpp +++ b/util/network/address.cpp @@ -139,7 +139,7 @@ IRemoteAddrPtr NAddr::GetSockAddr(SOCKET s) { return addr; } - + IRemoteAddrPtr NAddr::GetPeerAddr(SOCKET s) { auto addr = MakeHolder<TOpaqueAddr>(); @@ -150,41 +150,41 @@ IRemoteAddrPtr NAddr::GetPeerAddr(SOCKET s) { return addr; } -static const in_addr& InAddr(const IRemoteAddr& addr) { - return ((const sockaddr_in*)addr.Addr())->sin_addr; -} - -static const in6_addr& In6Addr(const IRemoteAddr& addr) { - return ((const sockaddr_in6*)addr.Addr())->sin6_addr; -} - -bool NAddr::IsLoopback(const IRemoteAddr& addr) { - if (addr.Addr()->sa_family == AF_INET) { +static const in_addr& InAddr(const IRemoteAddr& addr) { + return ((const sockaddr_in*)addr.Addr())->sin_addr; +} + +static const in6_addr& In6Addr(const IRemoteAddr& addr) { + return ((const sockaddr_in6*)addr.Addr())->sin6_addr; +} + +bool NAddr::IsLoopback(const IRemoteAddr& addr) { + if (addr.Addr()->sa_family == AF_INET) { return ((ntohl(InAddr(addr).s_addr) >> 24) & 0xff) == 127; } if (addr.Addr()->sa_family == AF_INET6) { - return 0 == memcmp(&In6Addr(addr), &in6addr_loopback, sizeof(in6_addr)); - } + return 0 == memcmp(&In6Addr(addr), &in6addr_loopback, sizeof(in6_addr)); + } return false; -} - -bool NAddr::IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs) { - if (lhs.Addr()->sa_family != rhs.Addr()->sa_family) { - return false; +} + +bool NAddr::IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs) { + if (lhs.Addr()->sa_family != rhs.Addr()->sa_family) { + return false; } if (lhs.Addr()->sa_family == AF_INET) { - return InAddr(lhs).s_addr == InAddr(rhs).s_addr; + return InAddr(lhs).s_addr == InAddr(rhs).s_addr; } if (lhs.Addr()->sa_family == AF_INET6) { - return 0 == memcmp(&In6Addr(lhs), &In6Addr(rhs), sizeof(in6_addr)); - } - - ythrow yexception() << "unsupported addr family: " << lhs.Addr()->sa_family; -} + return 0 == memcmp(&In6Addr(lhs), &In6Addr(rhs), sizeof(in6_addr)); + } + + ythrow yexception() << "unsupported addr family: " << lhs.Addr()->sa_family; +} socklen_t NAddr::SockAddrLength(const sockaddr* addr) { switch (addr->sa_family) { diff --git a/util/network/address.h b/util/network/address.h index b88c3558fc..448fcac0c9 100644 --- a/util/network/address.h +++ b/util/network/address.h @@ -25,9 +25,9 @@ namespace NAddr { TString PrintHost(const IRemoteAddr& addr); TString PrintHostAndPort(const IRemoteAddr& addr); - bool IsLoopback(const IRemoteAddr& addr); - bool IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs); - + bool IsLoopback(const IRemoteAddr& addr); + bool IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs); + socklen_t SockAddrLength(const sockaddr* addr); //for accept, recvfrom - see LenPtr() diff --git a/util/network/endpoint.h b/util/network/endpoint.h index 959b14d394..a3e59b4925 100644 --- a/util/network/endpoint.h +++ b/util/network/endpoint.h @@ -54,7 +54,7 @@ struct THash<TEndpoint> { inline bool operator==(const TEndpoint& l, const TEndpoint& r) { try { - return NAddr::IsSame(*l.Addr(), *r.Addr()) && l.Port() == r.Port(); + return NAddr::IsSame(*l.Addr(), *r.Addr()) && l.Port() == r.Port(); } catch (...) { return false; } diff --git a/util/network/endpoint_ut.cpp b/util/network/endpoint_ut.cpp index ed26b111cf..d5e40dd6e1 100644 --- a/util/network/endpoint_ut.cpp +++ b/util/network/endpoint_ut.cpp @@ -85,27 +85,27 @@ Y_UNIT_TEST_SUITE(TEndpointTest) { UNIT_ASSERT_VALUES_EQUAL(5u, he.size()); } - + Y_UNIT_TEST(TestEqual) { const TString ip1 = "2a02:6b8:0:1410::5f6c:f3c2"; const TString ip2 = "2a02:6b8:0:1410::5f6c:f3c3"; TNetworkAddress na1(ip1, 24242); - TEndpoint ep1(new NAddr::TAddrInfo(&*na1.Begin())); - + TEndpoint ep1(new NAddr::TAddrInfo(&*na1.Begin())); + TNetworkAddress na2(ip1, 24242); - TEndpoint ep2(new NAddr::TAddrInfo(&*na2.Begin())); - + TEndpoint ep2(new NAddr::TAddrInfo(&*na2.Begin())); + TNetworkAddress na3(ip2, 24242); - TEndpoint ep3(new NAddr::TAddrInfo(&*na3.Begin())); - + TEndpoint ep3(new NAddr::TAddrInfo(&*na3.Begin())); + TNetworkAddress na4(ip2, 24243); - TEndpoint ep4(new NAddr::TAddrInfo(&*na4.Begin())); - - UNIT_ASSERT(ep1 == ep2); - UNIT_ASSERT(!(ep1 == ep3)); - UNIT_ASSERT(!(ep1 == ep4)); - } + TEndpoint ep4(new NAddr::TAddrInfo(&*na4.Begin())); + + UNIT_ASSERT(ep1 == ep2); + UNIT_ASSERT(!(ep1 == ep3)); + UNIT_ASSERT(!(ep1 == ep4)); + } Y_UNIT_TEST(TestIsUnixSocket) { TNetworkAddress na1(TUnixSocketPath("/tmp/unixsocket")); diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp index 6c8519bdfe..a00e592e1c 100644 --- a/util/stream/buffered.cpp +++ b/util/stream/buffered.cpp @@ -71,13 +71,13 @@ public: while (true) { if (MemInput_.Exhausted()) { - const size_t bytesRead = Slave_->Read(Buf(), BufLen()); + const size_t bytesRead = Slave_->Read(Buf(), BufLen()); - if (!bytesRead) { + if (!bytesRead) { break; } - MemInput_.Reset(Buf(), bytesRead); + MemInput_.Reset(Buf(), bytesRead); } const size_t a_len(MemInput_.Avail()); diff --git a/util/stream/ios_ut.cpp b/util/stream/ios_ut.cpp index 9db05b9fba..139f4296e5 100644 --- a/util/stream/ios_ut.cpp +++ b/util/stream/ios_ut.cpp @@ -261,21 +261,21 @@ void TStreamsTest::TestBufferStream() { stream.Write(s.data(), s.size()); char buf[5]; - size_t bytesRead = stream.Read(buf, 4); - UNIT_ASSERT_EQUAL(4, bytesRead); + size_t bytesRead = stream.Read(buf, 4); + UNIT_ASSERT_EQUAL(4, bytesRead); UNIT_ASSERT_EQUAL(0, strncmp(s.data(), buf, 4)); stream.Write(s.data(), s.size()); - bytesRead = stream.Read(buf, 2); - UNIT_ASSERT_EQUAL(2, bytesRead); + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(2, bytesRead); UNIT_ASSERT_EQUAL(0, strncmp("te", buf, 2)); - bytesRead = stream.Read(buf, 2); - UNIT_ASSERT_EQUAL(2, bytesRead); + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(2, bytesRead); UNIT_ASSERT_EQUAL(0, strncmp("st", buf, 2)); - bytesRead = stream.Read(buf, 2); - UNIT_ASSERT_EQUAL(0, bytesRead); + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(0, bytesRead); } namespace { diff --git a/util/stream/tokenizer.h b/util/stream/tokenizer.h index 41f5f2a76d..b2398efdd1 100644 --- a/util/stream/tokenizer.h +++ b/util/stream/tokenizer.h @@ -173,11 +173,11 @@ public: private: inline size_t Fill() { const size_t avail = BufEnd() - End_; - const size_t bytesRead = Input_->Read(End_, avail); + const size_t bytesRead = Input_->Read(End_, avail); - End_ += bytesRead; + End_ += bytesRead; - return bytesRead; + return bytesRead; } inline char* BufBegin() noexcept { diff --git a/util/stream/zerocopy.cpp b/util/stream/zerocopy.cpp index be27642733..dc2982ad55 100644 --- a/util/stream/zerocopy.cpp +++ b/util/stream/zerocopy.cpp @@ -44,12 +44,12 @@ size_t IZeroCopyInputFastReadTo::DoReadTo(TString& st, char ch) { st.clear(); do { if (const char* pos = (const char*)memchr(ptr, ch, len)) { - size_t bytesRead = (pos - ptr) + 1; - if (bytesRead > 1) { + size_t bytesRead = (pos - ptr) + 1; + if (bytesRead > 1) { st.append(ptr, pos); } - Undo(len - bytesRead); - result += bytesRead; + Undo(len - bytesRead); + result += bytesRead; return result; } else { result += len; diff --git a/util/system/direct_io.cpp b/util/system/direct_io.cpp index ea4dc39382..f59c54b0cb 100644 --- a/util/system/direct_io.cpp +++ b/util/system/direct_io.cpp @@ -110,7 +110,7 @@ void TDirectIOBufferedFile::Finish() { File.Close(); } -void TDirectIOBufferedFile::Write(const void* buffer, size_t byteCount) { +void TDirectIOBufferedFile::Write(const void* buffer, size_t byteCount) { WriteToBuffer(buffer, byteCount, DataLen); WritePosition += byteCount; } @@ -146,51 +146,51 @@ void TDirectIOBufferedFile::WriteToFile(const void* buf, size_t len, ui64 positi } } -size_t TDirectIOBufferedFile::PreadSafe(void* buffer, size_t byteCount, ui64 offset) { +size_t TDirectIOBufferedFile::PreadSafe(void* buffer, size_t byteCount, ui64 offset) { if (FlushedToDisk < offset + byteCount) { File.FlushData(); FlushedToDisk = FlushedBytes; } -#ifdef _linux_ - ssize_t bytesRead = 0; - do { - bytesRead = pread(File.GetHandle(), buffer, byteCount, offset); - } while (bytesRead == -1 && errno == EINTR); +#ifdef _linux_ + ssize_t bytesRead = 0; + do { + bytesRead = pread(File.GetHandle(), buffer, byteCount, offset); + } while (bytesRead == -1 && errno == EINTR); - if (bytesRead < 0) { + if (bytesRead < 0) { ythrow yexception() << "error while pread file: " << LastSystemError() << "(" << LastSystemErrorText() << ")"; } - return bytesRead; -#else - return File.Pread(buffer, byteCount, offset); -#endif + return bytesRead; +#else + return File.Pread(buffer, byteCount, offset); +#endif } -size_t TDirectIOBufferedFile::ReadFromFile(void* buffer, size_t byteCount, ui64 offset) { - SetDirectIO(true); - - ui64 bytesRead = 0; - - while (byteCount) { - if (!Alignment || IsAligned(buffer) && IsAligned(byteCount) && IsAligned(offset)) { - if (const ui64 fromFile = PreadSafe(buffer, byteCount, offset)) { - buffer = (char*)buffer + fromFile; - byteCount -= fromFile; - offset += fromFile; - bytesRead += fromFile; - } else { - return bytesRead; - } - } else { - break; - } +size_t TDirectIOBufferedFile::ReadFromFile(void* buffer, size_t byteCount, ui64 offset) { + SetDirectIO(true); + + ui64 bytesRead = 0; + + while (byteCount) { + if (!Alignment || IsAligned(buffer) && IsAligned(byteCount) && IsAligned(offset)) { + if (const ui64 fromFile = PreadSafe(buffer, byteCount, offset)) { + buffer = (char*)buffer + fromFile; + byteCount -= fromFile; + offset += fromFile; + bytesRead += fromFile; + } else { + return bytesRead; + } + } else { + break; + } } - if (!byteCount) { - return bytesRead; - } + if (!byteCount) { + return bytesRead; + } ui64 bufSize = AlignUp(Min<size_t>(BufferStorage.Size(), byteCount + (Alignment << 1)), Alignment); TBuffer readBufferStorage(bufSize + Alignment); @@ -199,59 +199,59 @@ size_t TDirectIOBufferedFile::ReadFromFile(void* buffer, size_t byteCount, ui64 while (byteCount) { ui64 begin = AlignDown(offset, (ui64)Alignment); ui64 end = AlignUp(offset + byteCount, (ui64)Alignment); - ui64 toRead = Min(end - begin, bufSize); - ui64 fromFile = PreadSafe(readBuffer, toRead, begin); + ui64 toRead = Min(end - begin, bufSize); + ui64 fromFile = PreadSafe(readBuffer, toRead, begin); if (!fromFile) { break; } - ui64 delta = offset - begin; - ui64 count = Min<ui64>(fromFile - delta, byteCount); + ui64 delta = offset - begin; + ui64 count = Min<ui64>(fromFile - delta, byteCount); memcpy(buffer, readBuffer + delta, count); buffer = (char*)buffer + count; byteCount -= count; offset += count; - bytesRead += count; + bytesRead += count; } - return bytesRead; + return bytesRead; } -size_t TDirectIOBufferedFile::Read(void* buffer, size_t byteCount) { - size_t bytesRead = Pread(buffer, byteCount, ReadPosition); - ReadPosition += bytesRead; - return bytesRead; +size_t TDirectIOBufferedFile::Read(void* buffer, size_t byteCount) { + size_t bytesRead = Pread(buffer, byteCount, ReadPosition); + ReadPosition += bytesRead; + return bytesRead; } -size_t TDirectIOBufferedFile::Pread(void* buffer, size_t byteCount, ui64 offset) { +size_t TDirectIOBufferedFile::Pread(void* buffer, size_t byteCount, ui64 offset) { if (!byteCount) { return 0; } - size_t readFromFile = 0; + size_t readFromFile = 0; if (offset < FlushedBytes) { readFromFile = Min<ui64>(byteCount, FlushedBytes - offset); - size_t bytesRead = ReadFromFile(buffer, readFromFile, offset); - if (bytesRead != readFromFile || readFromFile == byteCount) { - return bytesRead; + size_t bytesRead = ReadFromFile(buffer, readFromFile, offset); + if (bytesRead != readFromFile || readFromFile == byteCount) { + return bytesRead; } } ui64 start = offset > FlushedBytes ? offset - FlushedBytes : 0; - ui64 count = Min<ui64>(DataLen - start, byteCount - readFromFile); + ui64 count = Min<ui64>(DataLen - start, byteCount - readFromFile); if (count) { memcpy((char*)buffer + readFromFile, (const char*)Buffer + start, count); } return count + readFromFile; } -void TDirectIOBufferedFile::Pwrite(const void* buffer, size_t byteCount, ui64 offset) { +void TDirectIOBufferedFile::Pwrite(const void* buffer, size_t byteCount, ui64 offset) { if (offset > WritePosition) { ythrow yexception() << "cannot frite to position" << offset; } - size_t writeToBufer = byteCount; - size_t writeToFile = 0; + size_t writeToBufer = byteCount; + size_t writeToFile = 0; if (FlushedBytes > offset) { writeToFile = Min<ui64>(byteCount, FlushedBytes - offset); diff --git a/util/system/direct_io.h b/util/system/direct_io.h index ec1ff84356..6a3325a960 100644 --- a/util/system/direct_io.h +++ b/util/system/direct_io.h @@ -16,10 +16,10 @@ public: void FlushData(); void Finish(); - size_t Read(void* buffer, size_t byteCount); - void Write(const void* buffer, size_t byteCount); - size_t Pread(void* buffer, size_t byteCount, ui64 offset); - void Pwrite(const void* buffer, size_t byteCount, ui64 offset); + size_t Read(void* buffer, size_t byteCount); + void Write(const void* buffer, size_t byteCount); + size_t Pread(void* buffer, size_t byteCount, ui64 offset); + void Pwrite(const void* buffer, size_t byteCount, ui64 offset); inline bool IsOpen() const { return true; @@ -54,8 +54,8 @@ private: return Alignment ? value == AlignDown(value, Alignment) : true; } - size_t PreadSafe(void* buffer, size_t byteCount, ui64 offset); - size_t ReadFromFile(void* buffer, size_t byteCount, ui64 offset); + size_t PreadSafe(void* buffer, size_t byteCount, ui64 offset); + size_t ReadFromFile(void* buffer, size_t byteCount, ui64 offset); void WriteToFile(const void* buf, size_t len, ui64 position); void WriteToBuffer(const void* buf, size_t len, ui64 position); void SetDirectIO(bool value); diff --git a/util/system/direct_io_ut.cpp b/util/system/direct_io_ut.cpp index 36c1a20d63..839c3de7ca 100644 --- a/util/system/direct_io_ut.cpp +++ b/util/system/direct_io_ut.cpp @@ -2,7 +2,7 @@ #include <util/generic/yexception.h> #include <util/system/fs.h> -#include <util/system/tempfile.h> +#include <util/system/tempfile.h> #include <util/random/random.h> #include "direct_io.h" @@ -13,7 +13,7 @@ Y_UNIT_TEST_SUITE(TDirectIoTestSuite) { Y_UNIT_TEST(TestDirectFile) { TDirectIOBufferedFile file(FileName_, RdWr | Direct | Seq | CreateAlways, 1 << 15); TVector<ui64> data((1 << 15) + 1); - TVector<ui64> readResult(data.size()); + TVector<ui64> readResult(data.size()); for (auto& i : data) { i = RandomNumber<ui64>(); } @@ -24,10 +24,10 @@ Y_UNIT_TEST_SUITE(TDirectIoTestSuite) { size_t readPos = RandomNumber(writePos); size_t readCount = RandomNumber(writePos - readPos); UNIT_ASSERT_VALUES_EQUAL( - file.Pread(&readResult[0], readCount * sizeof(ui64), readPos * sizeof(ui64)), + file.Pread(&readResult[0], readCount * sizeof(ui64), readPos * sizeof(ui64)), readCount * sizeof(ui64)); for (size_t i = 0; i < readCount; ++i) { - UNIT_ASSERT_VALUES_EQUAL(readResult[i], data[i + readPos]); + UNIT_ASSERT_VALUES_EQUAL(readResult[i], data[i + readPos]); } } file.Finish(); @@ -36,56 +36,56 @@ Y_UNIT_TEST_SUITE(TDirectIoTestSuite) { size_t readPos = RandomNumber(data.size()); size_t readCount = RandomNumber(data.size() - readPos); UNIT_ASSERT_VALUES_EQUAL( - fileNew.Pread(&readResult[0], readCount * sizeof(ui64), readPos * sizeof(ui64)), + fileNew.Pread(&readResult[0], readCount * sizeof(ui64), readPos * sizeof(ui64)), readCount * sizeof(ui64)); for (size_t j = 0; j < readCount; ++j) { - UNIT_ASSERT_VALUES_EQUAL(readResult[j], data[j + readPos]); + UNIT_ASSERT_VALUES_EQUAL(readResult[j], data[j + readPos]); } } size_t readCount = data.size(); UNIT_ASSERT_VALUES_EQUAL( - fileNew.Pread(&readResult[0], readCount * sizeof(ui64), 0), + fileNew.Pread(&readResult[0], readCount * sizeof(ui64), 0), readCount * sizeof(ui64)); for (size_t i = 0; i < readCount; ++i) { - UNIT_ASSERT_VALUES_EQUAL(readResult[i], data[i]); + UNIT_ASSERT_VALUES_EQUAL(readResult[i], data[i]); } NFs::Remove(FileName_); } - void TestHugeFile(size_t size) { - TTempFile tmpFile("test.file"); - - { - TDirectIOBufferedFile directIOFile(tmpFile.Name(), WrOnly | CreateAlways | Direct); - TVector<ui8> data(size, 'x'); - directIOFile.Write(&data[0], data.size()); - } - - { - TDirectIOBufferedFile directIOFile(tmpFile.Name(), RdOnly | Direct); - TVector<ui8> data(size + 1, 'y'); - - const size_t readResult = directIOFile.Read(&data[0], data.size()); - - UNIT_ASSERT_VALUES_EQUAL(readResult, size); - - UNIT_ASSERT_VALUES_EQUAL(data[0], 'x'); - UNIT_ASSERT_VALUES_EQUAL(data[size / 2], 'x'); - UNIT_ASSERT_VALUES_EQUAL(data[size - 1], 'x'); - UNIT_ASSERT_VALUES_EQUAL(data[size], 'y'); - } - } - - Y_UNIT_TEST(TestHugeFile1) { + void TestHugeFile(size_t size) { + TTempFile tmpFile("test.file"); + + { + TDirectIOBufferedFile directIOFile(tmpFile.Name(), WrOnly | CreateAlways | Direct); + TVector<ui8> data(size, 'x'); + directIOFile.Write(&data[0], data.size()); + } + + { + TDirectIOBufferedFile directIOFile(tmpFile.Name(), RdOnly | Direct); + TVector<ui8> data(size + 1, 'y'); + + const size_t readResult = directIOFile.Read(&data[0], data.size()); + + UNIT_ASSERT_VALUES_EQUAL(readResult, size); + + UNIT_ASSERT_VALUES_EQUAL(data[0], 'x'); + UNIT_ASSERT_VALUES_EQUAL(data[size / 2], 'x'); + UNIT_ASSERT_VALUES_EQUAL(data[size - 1], 'x'); + UNIT_ASSERT_VALUES_EQUAL(data[size], 'y'); + } + } + + Y_UNIT_TEST(TestHugeFile1) { if constexpr (sizeof(size_t) > 4) { - TestHugeFile(5 * 1024 * 1024 * 1024ULL); - } - } - Y_UNIT_TEST(TestHugeFile2) { + TestHugeFile(5 * 1024 * 1024 * 1024ULL); + } + } + Y_UNIT_TEST(TestHugeFile2) { if constexpr (sizeof(size_t) > 4) { - TestHugeFile(5 * 1024 * 1024 * 1024ULL + 1111); - } - } + TestHugeFile(5 * 1024 * 1024 * 1024ULL + 1111); + } + } } Y_UNIT_TEST_SUITE(TDirectIoErrorHandling) { diff --git a/util/system/file.cpp b/util/system/file.cpp index 6115799e8d..4a261d020c 100644 --- a/util/system/file.cpp +++ b/util/system/file.cpp @@ -1191,10 +1191,10 @@ size_t TFile::Pread(void* buf, size_t len, i64 offset) const { return Impl_->Pread(buf, len, offset); } -i32 TFile::RawPread(void* buf, ui32 len, i64 offset) const { - return Impl_->RawPread(buf, len, offset); -} - +i32 TFile::RawPread(void* buf, ui32 len, i64 offset) const { + return Impl_->RawPread(buf, len, offset); +} + void TFile::Pload(void* buf, size_t len, i64 offset) const { Impl_->Pload(buf, len, offset); } diff --git a/util/system/file.h b/util/system/file.h index 3b18ae07ce..9502e159b6 100644 --- a/util/system/file.h +++ b/util/system/file.h @@ -186,8 +186,8 @@ public: // Retries incomplete reads until EOF, throws on error size_t Pread(void* buf, size_t len, i64 offset) const; - // Single pread call - i32 RawPread(void* buf, ui32 len, i64 offset) const; + // Single pread call + i32 RawPread(void* buf, ui32 len, i64 offset) const; // Reads exactly len bytes, throws on premature EOF or error void Pload(void* buf, size_t len, i64 offset) const; diff --git a/util/system/file_ut.cpp b/util/system/file_ut.cpp index 2d5343c94d..941e6a50f3 100644 --- a/util/system/file_ut.cpp +++ b/util/system/file_ut.cpp @@ -21,7 +21,7 @@ class TFileTest: public TTestBase { UNIT_TEST(TestFlushSpecialFile); UNIT_TEST(TestRawRead); UNIT_TEST(TestRead); - UNIT_TEST(TestRawPread); + UNIT_TEST(TestRawPread); UNIT_TEST(TestPread); UNIT_TEST(TestCache); UNIT_TEST_SUITE_END(); @@ -35,7 +35,7 @@ public: void TestFlushSpecialFile(); void TestRawRead(); void TestRead(); - void TestRawPread(); + void TestRawPread(); void TestPread(); void TestCache(); @@ -281,30 +281,30 @@ void TFileTest::TestRead() { } } -void TFileTest::TestRawPread() { - TTempFile tmp("tmp"); - - { - TFile file(tmp.Name(), OpenAlways | WrOnly); - file.Write("1234567", 7); - file.Flush(); - file.Close(); - } - - { - TFile file(tmp.Name(), OpenExisting | RdOnly); - char buf[7]; - i32 reallyRead = file.RawPread(buf, 3, 1); - Y_ENSURE(0 <= reallyRead && reallyRead <= 3); - Y_ENSURE(TStringBuf(buf, reallyRead) == TStringBuf("234").Head(reallyRead)); - - memset(buf, 0, sizeof(buf)); - reallyRead = file.RawPread(buf, 2, 5); - Y_ENSURE(0 <= reallyRead && reallyRead <= 2); - Y_ENSURE(TStringBuf(buf, reallyRead) == TStringBuf("67").Head(reallyRead)); - } -} - +void TFileTest::TestRawPread() { + TTempFile tmp("tmp"); + + { + TFile file(tmp.Name(), OpenAlways | WrOnly); + file.Write("1234567", 7); + file.Flush(); + file.Close(); + } + + { + TFile file(tmp.Name(), OpenExisting | RdOnly); + char buf[7]; + i32 reallyRead = file.RawPread(buf, 3, 1); + Y_ENSURE(0 <= reallyRead && reallyRead <= 3); + Y_ENSURE(TStringBuf(buf, reallyRead) == TStringBuf("234").Head(reallyRead)); + + memset(buf, 0, sizeof(buf)); + reallyRead = file.RawPread(buf, 2, 5); + Y_ENSURE(0 <= reallyRead && reallyRead <= 2); + Y_ENSURE(TStringBuf(buf, reallyRead) == TStringBuf("67").Head(reallyRead)); + } +} + void TFileTest::TestPread() { TTempFile tmp("tmp"); |