aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.ru>2022-02-10 16:49:34 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:34 +0300
commit65e5266709e7ff94b14ae128309e229de714b0df (patch)
treed4901f06e56d95f5e5d36bd1806bcc144d03bf41 /library
parent0041d99876ae3dccc3f0fa8787131d85ddfd486b (diff)
downloadydb-65e5266709e7ff94b14ae128309e229de714b0df.tar.gz
Restoring authorship annotation for <kulikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/coroutine/engine/cont_poller.h26
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp292
-rw-r--r--library/cpp/coroutine/engine/impl.cpp70
-rw-r--r--library/cpp/coroutine/engine/impl.h50
-rw-r--r--library/cpp/coroutine/engine/poller.cpp30
-rw-r--r--library/cpp/coroutine/engine/poller.h4
-rw-r--r--library/cpp/coroutine/engine/sockpool.h8
-rw-r--r--library/cpp/coroutine/engine/trampoline.cpp6
-rw-r--r--library/cpp/coroutine/engine/trampoline.h8
-rw-r--r--library/cpp/http/io/compression.h44
-rw-r--r--library/cpp/http/io/compression_ut.cpp30
-rw-r--r--library/cpp/http/io/stream.cpp14
-rw-r--r--library/cpp/http/io/stream_ut.cpp98
-rw-r--r--library/cpp/http/server/conn.cpp16
-rw-r--r--library/cpp/http/server/http.cpp36
-rw-r--r--library/cpp/http/server/http_ut.cpp90
-rw-r--r--library/cpp/http/server/options.h18
-rw-r--r--library/cpp/http/server/response.h8
-rw-r--r--library/cpp/http/server/response_ut.cpp38
-rw-r--r--library/cpp/streams/brotli/brotli.cpp6
-rw-r--r--library/cpp/streams/brotli/brotli_ut.cpp34
-rw-r--r--library/cpp/threading/equeue/equeue.cpp152
-rw-r--r--library/cpp/threading/equeue/equeue.h50
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp226
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler.cpp458
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler.h162
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler_ut.cpp30
27 files changed, 1002 insertions, 1002 deletions
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h
index b638b2df1a..d158bbe9f5 100644
--- a/library/cpp/coroutine/engine/cont_poller.h
+++ b/library/cpp/coroutine/engine/cont_poller.h
@@ -143,13 +143,13 @@ namespace NCoro {
auto* lst = Lists_.Get(event->Fd());
const ui16 oldFlags = Flags(*lst);
lst->PushFront(event);
- ui16 newFlags = Flags(*lst);
+ ui16 newFlags = Flags(*lst);
if (newFlags != oldFlags) {
- if (oldFlags) {
- newFlags |= CONT_POLL_MODIFY;
- }
-
+ if (oldFlags) {
+ newFlags |= CONT_POLL_MODIFY;
+ }
+
P_->Set(lst, event->Fd(), newFlags);
}
}
@@ -158,13 +158,13 @@ namespace NCoro {
auto* lst = Lists_.Get(event->Fd());
const ui16 oldFlags = Flags(*lst);
event->Unlink();
- ui16 newFlags = Flags(*lst);
+ ui16 newFlags = Flags(*lst);
if (newFlags != oldFlags) {
- if (newFlags) {
- newFlags |= CONT_POLL_MODIFY;
- }
-
+ if (newFlags) {
+ newFlags |= CONT_POLL_MODIFY;
+ }
+
P_->Set(lst, event->Fd(), newFlags);
}
}
@@ -174,9 +174,9 @@ namespace NCoro {
P_->Wait(events, deadLine);
}
- EContPoller PollEngine() const {
- return P_->PollEngine();
- }
+ EContPoller PollEngine() const {
+ return P_->PollEngine();
+ }
private:
static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept {
ui16 ret = 0;
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
index 8b372496a2..a7012eb8e2 100644
--- a/library/cpp/coroutine/engine/coroutine_ut.cpp
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -8,16 +8,16 @@
#include <util/system/pipe.h>
#include <util/system/env.h>
#include <util/system/info.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/generic/xrange.h>
-#include <util/generic/serialized_enum.h>
+#include <util/generic/serialized_enum.h>
// TODO (velavokr): BALANCER-1345 add more tests on pollers
class TCoroTest: public TTestBase {
UNIT_TEST_SUITE(TCoroTest);
UNIT_TEST(TestSimpleX1);
- UNIT_TEST(TestSimpleX1MultiThread);
+ UNIT_TEST(TestSimpleX1MultiThread);
UNIT_TEST(TestSimpleX2);
UNIT_TEST(TestSimpleX3);
UNIT_TEST(TestMemFun);
@@ -40,10 +40,10 @@ class TCoroTest: public TTestBase {
UNIT_TEST(TestLegacyCancelYieldRaceBug)
UNIT_TEST(TestJoinRescheduleBug);
UNIT_TEST(TestEventQueue)
- UNIT_TEST(TestNestedExecutor)
- UNIT_TEST(TestComputeCoroutineYield)
- UNIT_TEST(TestPollEngines);
- UNIT_TEST(TestUserEvent);
+ UNIT_TEST(TestNestedExecutor)
+ UNIT_TEST(TestComputeCoroutineYield)
+ UNIT_TEST(TestPollEngines);
+ UNIT_TEST(TestUserEvent);
UNIT_TEST(TestPause);
UNIT_TEST(TestOverrideTime);
UNIT_TEST_SUITE_END();
@@ -51,7 +51,7 @@ class TCoroTest: public TTestBase {
public:
void TestException();
void TestSimpleX1();
- void TestSimpleX1MultiThread();
+ void TestSimpleX1MultiThread();
void TestSimpleX2();
void TestSimpleX3();
void TestMemFun();
@@ -72,10 +72,10 @@ public:
void TestLegacyCancelYieldRaceBug();
void TestJoinRescheduleBug();
void TestEventQueue();
- void TestNestedExecutor();
- void TestComputeCoroutineYield();
- void TestPollEngines();
- void TestUserEvent();
+ void TestNestedExecutor();
+ void TestComputeCoroutineYield();
+ void TestPollEngines();
+ void TestUserEvent();
void TestPause();
void TestOverrideTime();
};
@@ -130,54 +130,54 @@ static int i0;
static void CoRun(TCont* c, void* /*run*/) {
while (i0 < 100000) {
++i0;
- UNIT_ASSERT(RunningCont() == c);
+ UNIT_ASSERT(RunningCont() == c);
c->Yield();
- UNIT_ASSERT(RunningCont() == c);
+ UNIT_ASSERT(RunningCont() == c);
}
}
static void CoMain(TCont* c, void* /*arg*/) {
for (volatile size_t i2 = 0; i2 < 10; ++i2) {
- UNIT_ASSERT(RunningCont() == c);
+ UNIT_ASSERT(RunningCont() == c);
c->Executor()->Create(CoRun, nullptr, "run");
- UNIT_ASSERT(RunningCont() == c);
+ UNIT_ASSERT(RunningCont() == c);
}
}
void TCoroTest::TestSimpleX1() {
i0 = 0;
TContExecutor e(32000);
-
- UNIT_ASSERT(RunningCont() == nullptr);
-
+
+ UNIT_ASSERT(RunningCont() == nullptr);
+
e.Execute(CoMain);
UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
-
- UNIT_ASSERT(RunningCont() == nullptr);
-}
-
-void TCoroTest::TestSimpleX1MultiThread() {
- TVector<THolder<TThread>> threads;
- const size_t nThreads = 0;
- TAtomic c = 0;
- for (size_t i = 0; i < nThreads; ++i) {
- threads.push_back(MakeHolder<TThread>([&]() {
- TestSimpleX1();
- AtomicIncrement(c);
- }));
- }
-
- for (auto& t : threads) {
- t->Start();
- }
-
- for (auto& t: threads) {
- t->Join();
- }
-
- UNIT_ASSERT_EQUAL(c, nThreads);
-}
-
+
+ UNIT_ASSERT(RunningCont() == nullptr);
+}
+
+void TCoroTest::TestSimpleX1MultiThread() {
+ TVector<THolder<TThread>> threads;
+ const size_t nThreads = 0;
+ TAtomic c = 0;
+ for (size_t i = 0; i < nThreads; ++i) {
+ threads.push_back(MakeHolder<TThread>([&]() {
+ TestSimpleX1();
+ AtomicIncrement(c);
+ }));
+ }
+
+ for (auto& t : threads) {
+ t->Start();
+ }
+
+ for (auto& t: threads) {
+ t->Join();
+ }
+
+ UNIT_ASSERT_EQUAL(c, nThreads);
+}
+
struct TTestObject {
int i = 0;
int j = 0;
@@ -863,85 +863,85 @@ void TCoroTest::TestEventQueue() {
}, &queue);
}
-void TCoroTest::TestNestedExecutor() {
-#ifndef _win_
- //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
- TContExecutor exec(32000);
- UNIT_ASSERT(!RunningCont());
-
- exec.Execute([](TCont* cont, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
-
- TContExecutor exec2(32000);
- exec2.Execute([](TCont* cont2, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- TContExecutor exec3(32000);
- exec3.Execute([](TCont* cont3, void*) {
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
- });
-
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
- });
-
- UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
- });
-
- UNIT_ASSERT(!RunningCont());
-#endif
-}
-
-void TCoroTest::TestComputeCoroutineYield() {
-//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
- TContExecutor exec(32000);
- exec.SetFailOnError(true);
-
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
- size_t lastRead = 42;
-
- auto compute = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- write(out.GetHandle(), &i, sizeof i);
- Sleep(TDuration::MilliSeconds(10));
- cont->Yield();
- UNIT_ASSERT(lastRead == i);
- }
- };
-
- auto io = [&](TCont* cont) {
- for (size_t i = 0; i < 10; ++i) {
- NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
- }
- };
-
- exec.Create(compute, "compute");
- exec.Create(io, "io");
-
- exec.Execute();
-}
-
-void TCoroTest::TestPollEngines() {
- bool defaultChecked = false;
- for (auto engine : GetEnumAllValues<EContPoller>()) {
- auto poller = IPollerFace::Construct(engine);
- if (!poller) {
- continue;
- }
-
- TContExecutor exec(32000, IPollerFace::Construct(engine));
-
- if (engine == EContPoller::Default) {
- defaultChecked = true;
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
- }
- }
-
- UNIT_ASSERT(defaultChecked);
-}
-
+void TCoroTest::TestNestedExecutor() {
+#ifndef _win_
+ //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
+ TContExecutor exec(32000);
+ UNIT_ASSERT(!RunningCont());
+
+ exec.Execute([](TCont* cont, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+
+ TContExecutor exec2(32000);
+ exec2.Execute([](TCont* cont2, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ TContExecutor exec3(32000);
+ exec3.Execute([](TCont* cont3, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+ });
+
+ UNIT_ASSERT(!RunningCont());
+#endif
+}
+
+void TCoroTest::TestComputeCoroutineYield() {
+//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
+ TContExecutor exec(32000);
+ exec.SetFailOnError(true);
+
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+ size_t lastRead = 42;
+
+ auto compute = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ write(out.GetHandle(), &i, sizeof i);
+ Sleep(TDuration::MilliSeconds(10));
+ cont->Yield();
+ UNIT_ASSERT(lastRead == i);
+ }
+ };
+
+ auto io = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
+ }
+ };
+
+ exec.Create(compute, "compute");
+ exec.Create(io, "io");
+
+ exec.Execute();
+}
+
+void TCoroTest::TestPollEngines() {
+ bool defaultChecked = false;
+ for (auto engine : GetEnumAllValues<EContPoller>()) {
+ auto poller = IPollerFace::Construct(engine);
+ if (!poller) {
+ continue;
+ }
+
+ TContExecutor exec(32000, IPollerFace::Construct(engine));
+
+ if (engine == EContPoller::Default) {
+ defaultChecked = true;
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
+ }
+ }
+
+ UNIT_ASSERT(defaultChecked);
+}
+
void TCoroTest::TestPause() {
TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
@@ -959,28 +959,28 @@ void TCoroTest::TestPause() {
UNIT_ASSERT_EQUAL(i, 2);
}
-void TCoroTest::TestUserEvent() {
- TContExecutor exec(32000);
-
- struct TUserEvent : public IUserEvent {
- bool Called = false;
- void Execute() override {
- Called = true;
- }
- } event;
-
- auto f = [&](TCont* cont) {
- UNIT_ASSERT(!event.Called);
- exec.ScheduleUserEvent(&event);
- UNIT_ASSERT(!event.Called);
- cont->Yield();
- UNIT_ASSERT(event.Called);
- };
-
- exec.Execute(f);
-
- UNIT_ASSERT(event.Called);
-}
+void TCoroTest::TestUserEvent() {
+ TContExecutor exec(32000);
+
+ struct TUserEvent : public IUserEvent {
+ bool Called = false;
+ void Execute() override {
+ Called = true;
+ }
+ } event;
+
+ auto f = [&](TCont* cont) {
+ UNIT_ASSERT(!event.Called);
+ exec.ScheduleUserEvent(&event);
+ UNIT_ASSERT(!event.Called);
+ cont->Yield();
+ UNIT_ASSERT(event.Called);
+ };
+
+ exec.Execute(f);
+
+ UNIT_ASSERT(event.Called);
+}
void TCoroTest::TestOverrideTime() {
class TTime: public NCoro::ITime {
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp
index 7ae6f74051..9231d2b1ba 100644
--- a/library/cpp/coroutine/engine/impl.cpp
+++ b/library/cpp/coroutine/engine/impl.cpp
@@ -3,8 +3,8 @@
#include "stack/stack_allocator.h"
#include "stack/stack_guards.h"
-#include <util/generic/scope.h>
-#include <util/thread/singleton.h>
+#include <util/generic/scope.h>
+#include <util/thread/singleton.h>
#include <util/stream/format.h>
#include <util/stream/output.h>
#include <util/system/yassert.h>
@@ -157,47 +157,47 @@ void TContExecutor::WaitForIO() {
// Waking a coroutine puts it into ReadyNext_ list
const auto next = WaitQueue_.WakeTimedout(now);
- if (!UserEvents_.Empty()) {
- TIntrusiveList<IUserEvent> userEvents;
- userEvents.Swap(UserEvents_);
- do {
- userEvents.PopFront()->Execute();
- } while (!userEvents.Empty());
- }
-
+ if (!UserEvents_.Empty()) {
+ TIntrusiveList<IUserEvent> userEvents;
+ userEvents.Swap(UserEvents_);
+ do {
+ userEvents.PopFront()->Execute();
+ } while (!userEvents.Empty());
+ }
+
// Polling will return as soon as there is an event to process or a timeout.
// If there are woken coroutines we do not want to sleep in the poller
// yet still we want to check for new io
// to prevent ourselves from locking out of io by constantly waking coroutines.
- if (ReadyNext_.Empty()) {
+ if (ReadyNext_.Empty()) {
if (EnterPollerCallback_) {
EnterPollerCallback_->OnEnterPoller();
}
- Poll(next);
+ Poll(next);
if (EnterPollerCallback_) {
EnterPollerCallback_->OnExitPoller();
}
- } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
+ } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
if (EnterPollerCallback_) {
EnterPollerCallback_->OnEnterPoller();
}
- Poll(now);
+ Poll(now);
if (EnterPollerCallback_) {
EnterPollerCallback_->OnExitPoller();
}
- }
+ }
Ready_.Append(ReadyNext_);
}
}
-void TContExecutor::Poll(TInstant deadline) {
- Poller_.Wait(PollerEvents_, deadline);
- LastPoll_ = Now();
-
- // Waking a coroutine puts it into ReadyNext_ list
- for (auto event : PollerEvents_) {
+void TContExecutor::Poll(TInstant deadline) {
+ Poller_.Wait(PollerEvents_, deadline);
+ LastPoll_ = Now();
+
+ // Waking a coroutine puts it into ReadyNext_ list
+ for (auto event : PollerEvents_) {
auto* lst = (NCoro::TPollEventList*)event.Data;
const int status = event.Status;
@@ -276,36 +276,36 @@ void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
Ready_.PushBack(cont);
}
-namespace {
+namespace {
inline TContExecutor*& ThisThreadExecutor() {
struct TThisThreadExecutorHolder {
- TContExecutor* Executor = nullptr;
+ TContExecutor* Executor = nullptr;
};
return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
}
-}
-
+}
+
void TContExecutor::DeleteScheduled() noexcept {
ToDelete_.ForEach([this](TCont* c) {
Release(c);
});
}
-TCont* RunningCont() {
+TCont* RunningCont() {
TContExecutor* thisThreadExecutor = ThisThreadExecutor();
- return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
-}
-
+ return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
+}
+
void TContExecutor::RunScheduler() noexcept {
try {
- TContExecutor* const prev = ThisThreadExecutor();
+ TContExecutor* const prev = ThisThreadExecutor();
ThisThreadExecutor() = this;
TCont* caller = Current_;
TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
- Y_DEFER {
- ThisThreadExecutor() = prev;
- };
-
+ Y_DEFER {
+ ThisThreadExecutor() = prev;
+ };
+
while (true) {
if (ScheduleCallback_ && Current_) {
ScheduleCallback_->OnUnschedule(*this);
@@ -345,7 +345,7 @@ void TContExecutor::RunScheduler() noexcept {
}
}
} catch (...) {
- TBackTrace::FromCurrentException().PrintTo(Cerr);
+ TBackTrace::FromCurrentException().PrintTo(Cerr);
Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
}
}
diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h
index 283a96ecf1..a055012f3f 100644
--- a/library/cpp/coroutine/engine/impl.h
+++ b/library/cpp/coroutine/engine/impl.h
@@ -121,9 +121,9 @@ private:
bool Scheduled_ = false;
};
-TCont* RunningCont();
-
+TCont* RunningCont();
+
template <class Functor>
static void ContHelperFunc(TCont* cont, void* arg) {
(*((Functor*)(arg)))(cont);
@@ -134,15 +134,15 @@ static void ContHelperMemberFunc(TCont* c, void* arg) {
((reinterpret_cast<T*>(arg))->*M)(c);
}
-class IUserEvent
- : public TIntrusiveListItem<IUserEvent>
-{
-public:
- virtual ~IUserEvent() = default;
-
- virtual void Execute() = 0;
-};
-
+class IUserEvent
+ : public TIntrusiveListItem<IUserEvent>
+{
+public:
+ virtual ~IUserEvent() = default;
+
+ virtual void Execute() = 0;
+};
+
/// Central coroutine class.
/// Note, coroutines are single-threaded, and all methods must be called from the single thread
class TContExecutor {
@@ -249,22 +249,22 @@ public:
return FailOnError_;
}
- void RegisterInWaitQueue(NCoro::TContPollEvent* event) {
- WaitQueue_.Register(event);
- }
-
+ void RegisterInWaitQueue(NCoro::TContPollEvent* event) {
+ WaitQueue_.Register(event);
+ }
+
void ScheduleIoWait(TFdEvent* event) {
- RegisterInWaitQueue(event);
+ RegisterInWaitQueue(event);
Poller_.Schedule(event);
}
void ScheduleIoWait(TTimerEvent* event) noexcept {
- RegisterInWaitQueue(event);
+ RegisterInWaitQueue(event);
}
- void ScheduleUserEvent(IUserEvent* event) {
- UserEvents_.PushBack(event);
- }
+ void ScheduleUserEvent(IUserEvent* event) {
+ UserEvents_.PushBack(event);
+ }
void Pause();
TInstant Now();
@@ -285,7 +285,7 @@ private:
void WaitForIO();
- void Poll(TInstant deadline);
+ void Poll(TInstant deadline);
private:
NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr;
@@ -300,11 +300,11 @@ private:
TContList ReadyNext_;
NCoro::TEventWaitQueue WaitQueue_;
NCoro::TContPoller Poller_;
- NCoro::TContPoller::TEvents PollerEvents_;
- TInstant LastPoll_;
-
- TIntrusiveList<IUserEvent> UserEvents_;
+ NCoro::TContPoller::TEvents PollerEvents_;
+ TInstant LastPoll_;
+ TIntrusiveList<IUserEvent> UserEvents_;
+
size_t Allocated_ = 0;
TCont* Current_ = nullptr;
bool FailOnError_ = false;
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp
index 61164fa56b..722ef1e3a6 100644
--- a/library/cpp/coroutine/engine/poller.cpp
+++ b/library/cpp/coroutine/engine/poller.cpp
@@ -46,11 +46,11 @@ namespace {
template <class T>
class TVirtualize: public IPollerFace {
public:
- TVirtualize(EContPoller pollerEngine)
- : PollerEngine_(pollerEngine)
- {
- }
-
+ TVirtualize(EContPoller pollerEngine)
+ : PollerEngine_(pollerEngine)
+ {
+ }
+
void Set(const TChange& c) override {
P_.Set(c);
}
@@ -59,12 +59,12 @@ namespace {
P_.Wait(events, deadLine);
}
- EContPoller PollEngine() const override {
- return PollerEngine_;
- }
+ EContPoller PollEngine() const override {
+ return PollerEngine_;
+ }
private:
T P_;
- const EContPoller PollerEngine_;
+ const EContPoller PollerEngine_;
};
@@ -366,21 +366,21 @@ THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
switch (poller) {
case EContPoller::Default:
- case EContPoller::Combined:
- return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined);
+ case EContPoller::Combined:
+ return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined);
case EContPoller::Select:
- return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
case EContPoller::Poll:
- return MakeHolder<TVirtualize<TPollPoller>>(poller);
+ return MakeHolder<TVirtualize<TPollPoller>>(poller);
case EContPoller::Epoll:
#if defined(HAVE_EPOLL_POLLER)
- return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
#endif
case EContPoller::Kqueue:
#if defined(HAVE_KQUEUE_POLLER)
- return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
#endif
diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h
index 8ea012c0fc..ff60266d73 100644
--- a/library/cpp/coroutine/engine/poller.h
+++ b/library/cpp/coroutine/engine/poller.h
@@ -8,7 +8,7 @@
enum class EContPoller {
Default /* "default" */,
- Combined /* "combined" */,
+ Combined /* "combined" */,
Select /* "select" */,
Poll /* "poll" */,
Epoll /* "epoll" */,
@@ -42,7 +42,7 @@ public:
virtual void Set(const TChange& change) = 0;
virtual void Wait(TEvents& events, TInstant deadLine) = 0;
- virtual EContPoller PollEngine() const = 0;
+ virtual EContPoller PollEngine() const = 0;
static THolder<IPollerFace> Default();
static THolder<IPollerFace> Construct(TStringBuf name);
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h
index 1ebb7e7b38..2afd5c709a 100644
--- a/library/cpp/coroutine/engine/sockpool.h
+++ b/library/cpp/coroutine/engine/sockpool.h
@@ -93,7 +93,7 @@ public:
: Impl_(nullptr)
{
}
-
+
TPooledSocket(TImpl* impl)
: Impl_(impl)
{
@@ -178,7 +178,7 @@ public:
} else {
ret = AllocateMore(conn);
}
-
+
ret.Impl_->Touch();
return ret;
@@ -189,10 +189,10 @@ public:
alive->Touch();
socket = TPooledSocket(alive);
return true;
- }
+ }
return false;
}
-
+
private:
TPooledSocket::TImpl* GetImpl() {
TGuard<TMutex> guard(Mutex_);
diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp
index 10ea69ddc3..6857670e1e 100644
--- a/library/cpp/coroutine/engine/trampoline.cpp
+++ b/library/cpp/coroutine/engine/trampoline.cpp
@@ -38,9 +38,9 @@ TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f,
return Stack_.Get();
}
- const char* TTrampoline::ContName() const noexcept {
- return Cont_->Name();
- }
+ const char* TTrampoline::ContName() const noexcept {
+ return Cont_->Name();
+ }
void TTrampoline::DoRunNaked() {
DoRun();
diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h
index 37b61cf015..30cc079ab0 100644
--- a/library/cpp/coroutine/engine/trampoline.h
+++ b/library/cpp/coroutine/engine/trampoline.h
@@ -39,8 +39,8 @@ namespace NCoro {
}
void SwitchTo(TExceptionSafeContext* ctx) noexcept {
- Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName());
- Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName());
+ Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName());
+ Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName());
Ctx_.SwitchTo(ctx);
}
@@ -49,8 +49,8 @@ namespace NCoro {
void DoRunNaked() override;
private:
- const char* ContName() const noexcept;
- private:
+ const char* ContName() const noexcept;
+ private:
NStack::TStackHolder Stack_;
const TContClosure Clo_;
TExceptionSafeContext Ctx_;
diff --git a/library/cpp/http/io/compression.h b/library/cpp/http/io/compression.h
index f16c4a18eb..30eccdaca5 100644
--- a/library/cpp/http/io/compression.h
+++ b/library/cpp/http/io/compression.h
@@ -48,25 +48,25 @@ private:
THashMap<TStringBuf, TCodec> Codecs_;
TVector<TStringBuf> BestCodecs_;
};
-
-namespace NHttp {
- template <typename F>
- TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) {
- if (available.empty()) {
- return "identity";
- }
-
- if (accepted("*")) {
- return TString(available[0]);
- }
-
- for (const auto& coding : available) {
- TString s(coding);
- if (accepted(s)) {
- return s;
- }
- }
-
- return "identity";
- }
-}
+
+namespace NHttp {
+ template <typename F>
+ TString ChooseBestCompressionScheme(F accepted, TArrayRef<const TStringBuf> available) {
+ if (available.empty()) {
+ return "identity";
+ }
+
+ if (accepted("*")) {
+ return TString(available[0]);
+ }
+
+ for (const auto& coding : available) {
+ TString s(coding);
+ if (accepted(s)) {
+ return s;
+ }
+ }
+
+ return "identity";
+ }
+}
diff --git a/library/cpp/http/io/compression_ut.cpp b/library/cpp/http/io/compression_ut.cpp
index 2f3d131f8c..d8a2d11a08 100644
--- a/library/cpp/http/io/compression_ut.cpp
+++ b/library/cpp/http/io/compression_ut.cpp
@@ -5,7 +5,7 @@
#include <library/cpp/testing/unittest/tests_data.h>
#include <util/stream/zlib.h>
-#include <util/generic/hash_set.h>
+#include <util/generic/hash_set.h>
Y_UNIT_TEST_SUITE(THttpCompressionTest) {
static const TString DATA = "I'm a teapot";
@@ -43,18 +43,18 @@ Y_UNIT_TEST_SUITE(THttpCompressionTest) {
auto decodedStream = (*decoder)(&buffer);
UNIT_ASSERT_EQUAL(decodedStream->ReadAll(), DATA);
}
-
- Y_UNIT_TEST(TestChooseBestCompressionScheme) {
- THashSet<TString> accepted;
-
- auto checkAccepted = [&accepted](const TString& v) {
- return accepted.contains(v);
- };
-
- UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
- accepted.insert("deflate");
- UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
- accepted.insert("*");
- UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
- }
+
+ Y_UNIT_TEST(TestChooseBestCompressionScheme) {
+ THashSet<TString> accepted;
+
+ auto checkAccepted = [&accepted](const TString& v) {
+ return accepted.contains(v);
+ };
+
+ UNIT_ASSERT_VALUES_EQUAL("identity", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ accepted.insert("deflate");
+ UNIT_ASSERT_VALUES_EQUAL("deflate", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ accepted.insert("*");
+ UNIT_ASSERT_VALUES_EQUAL("gzip", NHttp::ChooseBestCompressionScheme(checkAccepted, {"gzip", "deflate"}));
+ }
} // THttpCompressionTest suite
diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp
index 6689be684f..728d1a89c1 100644
--- a/library/cpp/http/io/stream.cpp
+++ b/library/cpp/http/io/stream.cpp
@@ -286,7 +286,7 @@ private:
TParsedHeaders p;
size_t pos = FirstLine_.rfind(' ');
- // In HTTP/1.1 Keep-Alive is turned on by default
+ // In HTTP/1.1 Keep-Alive is turned on by default
if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) {
p.KeepAlive = true; //request
} else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) {
@@ -428,12 +428,12 @@ bool THttpInput::AcceptEncoding(const TString& coding) const {
}
TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const {
- return NHttp::ChooseBestCompressionScheme(
- [this](const TString& coding) {
- return AcceptEncoding(coding);
- },
- codings
- );
+ return NHttp::ChooseBestCompressionScheme(
+ [this](const TString& coding) {
+ return AcceptEncoding(coding);
+ },
+ codings
+ );
}
TString THttpInput::BestCompressionScheme() const {
diff --git a/library/cpp/http/io/stream_ut.cpp b/library/cpp/http/io/stream_ut.cpp
index 1ea35df675..1d78c82e0e 100644
--- a/library/cpp/http/io/stream_ut.cpp
+++ b/library/cpp/http/io/stream_ut.cpp
@@ -179,63 +179,63 @@ Y_UNIT_TEST_SUITE(THttpStreamTest) {
}
Y_UNIT_TEST(TestKeepAlive) {
- {
+ {
TString s = "GET / HTTP/1.0\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(!in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
TString s = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
TString s = "GET / HTTP/1.1\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
TString s = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(!in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
TString s = "HTTP/1.0 200 Ok\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(!in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+
+ {
TString s = "HTTP/1.0 200 Ok\r\nConnection: keep-alive\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
TString s = "HTTP/1.1 200 Ok\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(in.IsKeepAlive());
- }
-
- {
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(in.IsKeepAlive());
+ }
+
+ {
TString s = "HTTP/1.1 200 Ok\r\nConnection: close\r\n\r\n";
- TStringInput si(s);
- THttpInput in(&si);
- UNIT_ASSERT(!in.IsKeepAlive());
- }
- }
-
+ TStringInput si(s);
+ THttpInput in(&si);
+ UNIT_ASSERT(!in.IsKeepAlive());
+ }
+ }
+
Y_UNIT_TEST(TestMinRequest) {
TString res = "qqqqqq";
TPortManager pm;
diff --git a/library/cpp/http/server/conn.cpp b/library/cpp/http/server/conn.cpp
index 38a76c4c30..37f82f9f7b 100644
--- a/library/cpp/http/server/conn.cpp
+++ b/library/cpp/http/server/conn.cpp
@@ -5,11 +5,11 @@
class THttpServerConn::TImpl {
public:
- inline TImpl(const TSocket& s, size_t outputBufferSize)
+ inline TImpl(const TSocket& s, size_t outputBufferSize)
: S_(s)
, SI_(S_)
, SO_(S_)
- , BO_(&SO_, outputBufferSize)
+ , BO_(&SO_, outputBufferSize)
, HI_(&SI_)
, HO_(&BO_, &HI_)
{
@@ -44,15 +44,15 @@ private:
};
THttpServerConn::THttpServerConn(const TSocket& s)
- : THttpServerConn(s, s.MaximumTransferUnit())
-{
-}
-
-THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize)
- : Impl_(new TImpl(s, outputBufferSize))
+ : THttpServerConn(s, s.MaximumTransferUnit())
{
}
+THttpServerConn::THttpServerConn(const TSocket& s, size_t outputBufferSize)
+ : Impl_(new TImpl(s, outputBufferSize))
+{
+}
+
THttpServerConn::~THttpServerConn() {
}
diff --git a/library/cpp/http/server/http.cpp b/library/cpp/http/server/http.cpp
index 128583bdd7..3d21108b02 100644
--- a/library/cpp/http/server/http.cpp
+++ b/library/cpp/http/server/http.cpp
@@ -1,5 +1,5 @@
#include "http.h"
-#include "http_ex.h"
+#include "http_ex.h"
#include <library/cpp/threading/equeue/equeue.h>
@@ -243,17 +243,17 @@ public:
}
void AddRequest(TAutoPtr<TClientRequest> req, bool fail) {
- struct TFailRequest: public THttpClientRequestEx {
+ struct TFailRequest: public THttpClientRequestEx {
inline TFailRequest(TAutoPtr<TClientRequest> parent) {
Conn_.Reset(parent->Conn_.Release());
HttpConn_.Reset(parent->HttpConn_.Release());
}
bool Reply(void*) override {
- if (!ProcessHeaders()) {
+ if (!ProcessHeaders()) {
return true;
- }
-
+ }
+
ProcessFailRequest(0);
return true;
}
@@ -558,11 +558,11 @@ TClientConnection::TClientConnection(const TSocket& s, THttpServer::TImpl* serv,
{
SetNoDelay(Socket_, true);
- const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout;
- if (clientTimeout != TDuration::Zero()) {
+ const TDuration& clientTimeout = HttpServ_->Options().ClientTimeout;
+ if (clientTimeout != TDuration::Zero()) {
SetSocketTimeout(Socket_, (long)clientTimeout.Seconds(), clientTimeout.MilliSecondsOfSecond());
- }
-
+ }
+
HttpServ_->IncreaseConnections();
}
@@ -679,16 +679,16 @@ void TClientRequest::ResetConnection() {
void TClientRequest::Process(void* ThreadSpecificResource) {
THolder<TClientRequest> this_(this);
- auto* serverImpl = Conn_->HttpServ_;
-
+ auto* serverImpl = Conn_->HttpServ_;
+
try {
if (!HttpConn_) {
- const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize;
- if (outputBufferSize) {
- HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize));
- } else {
- HttpConn_.Reset(new THttpServerConn(Socket()));
- }
+ const size_t outputBufferSize = HttpServ()->Options().OutputBufferSize;
+ if (outputBufferSize) {
+ HttpConn_.Reset(new THttpServerConn(Socket(), outputBufferSize));
+ } else {
+ HttpConn_.Reset(new THttpServerConn(Socket()));
+ }
auto maxRequestsPerConnection = HttpServ()->Options().MaxRequestsPerConnection;
HttpConn_->Output()->EnableKeepAlive(HttpServ()->Options().KeepAliveEnabled && (!maxRequestsPerConnection || Conn_->ReceivedRequests < maxRequestsPerConnection));
@@ -715,7 +715,7 @@ void TClientRequest::Process(void* ThreadSpecificResource) {
return;
}
} catch (...) {
- serverImpl->Cb_->OnException();
+ serverImpl->Cb_->OnException();
throw;
}
diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp
index cc62bb988e..42580906f6 100644
--- a/library/cpp/http/server/http_ut.cpp
+++ b/library/cpp/http/server/http_ut.cpp
@@ -456,31 +456,31 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
server.Stop();
}
- class TReleaseConnectionServer: public THttpServer::ICallBack {
- class TRequest: public THttpClientRequestEx {
- public:
- bool Reply(void* /*tsr*/) override {
- Output() << "HTTP/1.1 200 Ok\r\n\r\n";
- Output() << "reply";
- Output().Finish();
-
- ReleaseConnection();
-
- throw yexception() << "some error";
-
- return true;
- }
- };
-
- public:
- TClientRequest* CreateClient() override {
- return new TRequest();
- }
-
- void OnException() override {
- ExceptionMessage = CurrentExceptionMessage();
- }
-
+ class TReleaseConnectionServer: public THttpServer::ICallBack {
+ class TRequest: public THttpClientRequestEx {
+ public:
+ bool Reply(void* /*tsr*/) override {
+ Output() << "HTTP/1.1 200 Ok\r\n\r\n";
+ Output() << "reply";
+ Output().Finish();
+
+ ReleaseConnection();
+
+ throw yexception() << "some error";
+
+ return true;
+ }
+ };
+
+ public:
+ TClientRequest* CreateClient() override {
+ return new TRequest();
+ }
+
+ void OnException() override {
+ ExceptionMessage = CurrentExceptionMessage();
+ }
+
TString ExceptionMessage;
};
@@ -495,7 +495,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
}
};
- public:
+ public:
TClientRequest* CreateClient() override {
return new TRequest();
}
@@ -504,9 +504,9 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
ExceptionMessage = CurrentExceptionMessage();
}
- TString ExceptionMessage;
- };
-
+ TString ExceptionMessage;
+ };
+
class TListenerSockAddrReplyServer: public THttpServer::ICallBack {
class TRequest: public TClientRequest {
public:
@@ -542,22 +542,22 @@ Y_UNIT_TEST_SUITE(THttpServerTest) {
};
Y_UNIT_TEST(TTestReleaseConnection) {
- TPortManager pm;
- const ui16 port = pm.GetPort();
-
- TReleaseConnectionServer serverImpl;
- THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
- UNIT_ASSERT(server.Start());
-
- TTestRequest r(port, "request");
- r.KeepAliveConnection = true;
-
- UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
-
- server.Stop();
-
- UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
- };
+ TPortManager pm;
+ const ui16 port = pm.GetPort();
+
+ TReleaseConnectionServer serverImpl;
+ THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
+ UNIT_ASSERT(server.Start());
+
+ TTestRequest r(port, "request");
+ r.KeepAliveConnection = true;
+
+ UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
+
+ server.Stop();
+
+ UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
+ };
THttpInput SendRequest(TSocket& socket, ui16 port) {
TSocketInput si(socket);
diff --git a/library/cpp/http/server/options.h b/library/cpp/http/server/options.h
index 38eda0e5e7..602649e430 100644
--- a/library/cpp/http/server/options.h
+++ b/library/cpp/http/server/options.h
@@ -6,7 +6,7 @@
#include <util/generic/size_literals.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
class THttpServerOptions {
public:
@@ -93,10 +93,10 @@ public:
return *this;
}
-
+
inline THttpServerOptions& SetClientTimeout(const TDuration& timeout) noexcept {
ClientTimeout = timeout;
-
+
return *this;
}
@@ -107,11 +107,11 @@ public:
}
inline THttpServerOptions& SetOutputBufferSize(size_t val) noexcept {
- OutputBufferSize = val;
-
- return *this;
- }
-
+ OutputBufferSize = val;
+
+ return *this;
+ }
+
inline THttpServerOptions& SetMaxInputContentLength(ui64 val) noexcept {
MaxInputContentLength = val;
@@ -162,7 +162,7 @@ public:
ui32 MaxConnections = 100;
int ListenBacklog = SOMAXCONN;
TDuration ClientTimeout;
- size_t OutputBufferSize = 0;
+ size_t OutputBufferSize = 0;
ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB;
size_t MaxRequestsPerConnection = 0; // If keep-alive is enabled, request limit before connection is closed
bool UseElasticQueues = false;
diff --git a/library/cpp/http/server/response.h b/library/cpp/http/server/response.h
index a75cb85605..eed4afc7b6 100644
--- a/library/cpp/http/server/response.h
+++ b/library/cpp/http/server/response.h
@@ -34,10 +34,10 @@ public:
THttpResponse& AddMultipleHeaders(const THttpHeaders& headers);
- const THttpHeaders& GetHeaders() const {
- return Headers;
- }
-
+ const THttpHeaders& GetHeaders() const {
+ return Headers;
+ }
+
THttpResponse& SetContentType(const TStringBuf& contentType);
/**
diff --git a/library/cpp/http/server/response_ut.cpp b/library/cpp/http/server/response_ut.cpp
index 73e2112ad3..8a142fb1ba 100644
--- a/library/cpp/http/server/response_ut.cpp
+++ b/library/cpp/http/server/response_ut.cpp
@@ -46,25 +46,25 @@ Y_UNIT_TEST_SUITE(TestHttpResponse) {
EXPECTED);
}
- Y_UNIT_TEST(TestGetHeaders) {
- THttpResponse resp(HTTP_FORBIDDEN);
-
- THttpHeaders headers;
- headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
- headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo"));
- headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree"));
- resp.AddMultipleHeaders(headers);
- resp.AddHeader("X-Header-4", "ValueFour");
-
- const THttpHeaders& gotHeaders = resp.GetHeaders();
- UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4);
- UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1"));
- UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne");
- UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4"));
- UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour");
- }
-
-
+ Y_UNIT_TEST(TestGetHeaders) {
+ THttpResponse resp(HTTP_FORBIDDEN);
+
+ THttpHeaders headers;
+ headers.AddHeader(THttpInputHeader("X-Header-1", "ValueOne"));
+ headers.AddHeader(THttpInputHeader("X-Header-2", "ValueTwo"));
+ headers.AddHeader(THttpInputHeader("X-Header-3", "ValueThree"));
+ resp.AddMultipleHeaders(headers);
+ resp.AddHeader("X-Header-4", "ValueFour");
+
+ const THttpHeaders& gotHeaders = resp.GetHeaders();
+ UNIT_ASSERT_VALUES_EQUAL(gotHeaders.Count(), 4);
+ UNIT_ASSERT(gotHeaders.HasHeader("X-Header-1"));
+ UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-1")->Value(), "ValueOne");
+ UNIT_ASSERT(gotHeaders.HasHeader("X-Header-4"));
+ UNIT_ASSERT_STRINGS_EQUAL(gotHeaders.FindHeader("X-Header-4")->Value(), "ValueFour");
+ }
+
+
Y_UNIT_TEST(TestSetContent) {
const char* EXPECTED = "HTTP/1.1 200 Ok\r\n"
"Content-Length: 10\r\n"
diff --git a/library/cpp/streams/brotli/brotli.cpp b/library/cpp/streams/brotli/brotli.cpp
index 38052cb688..a5536eff99 100644
--- a/library/cpp/streams/brotli/brotli.cpp
+++ b/library/cpp/streams/brotli/brotli.cpp
@@ -139,7 +139,7 @@ public:
ui8* outBuffer = static_cast<ui8*>(buffer);
size_t availableOut = size;
- size_t decompressedSize = 0;
+ size_t decompressedSize = 0;
BrotliDecoderResult result;
do {
@@ -163,7 +163,7 @@ public:
&outBuffer,
nullptr);
- decompressedSize = size - availableOut;
+ decompressedSize = size - availableOut;
SubstreamFinished_ = (result == BROTLI_DECODER_RESULT_SUCCESS);
if (result == BROTLI_DECODER_RESULT_ERROR) {
@@ -175,7 +175,7 @@ public:
"Buffer passed to read in Brotli decoder is too small");
break;
}
- } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_);
+ } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_);
if (!SubstreamFinished_ && decompressedSize == 0) {
ythrow yexception() << "Input stream is incomplete";
diff --git a/library/cpp/streams/brotli/brotli_ut.cpp b/library/cpp/streams/brotli/brotli_ut.cpp
index aeb2e284dc..fb4372c4a5 100644
--- a/library/cpp/streams/brotli/brotli_ut.cpp
+++ b/library/cpp/streams/brotli/brotli_ut.cpp
@@ -41,23 +41,23 @@ Y_UNIT_TEST_SUITE(TBrotliTestSuite) {
TestCase("hello world");
}
- Y_UNIT_TEST(TestFlush) {
- TStringStream ss;
- TBrotliCompress compressStream(&ss);
- TBrotliDecompress decompressStream(&ss);
-
- for (size_t i = 0; i < 3; ++i) {
- TString s = GenerateRandomString(1 << 15);
- compressStream.Write(s.data(), s.size());
- compressStream.Flush();
-
- TString r(s.size(), '*');
- decompressStream.Load((char*)r.data(), r.size());
-
- UNIT_ASSERT_VALUES_EQUAL(s, r);
- }
- }
-
+ Y_UNIT_TEST(TestFlush) {
+ TStringStream ss;
+ TBrotliCompress compressStream(&ss);
+ TBrotliDecompress decompressStream(&ss);
+
+ for (size_t i = 0; i < 3; ++i) {
+ TString s = GenerateRandomString(1 << 15);
+ compressStream.Write(s.data(), s.size());
+ compressStream.Flush();
+
+ TString r(s.size(), '*');
+ decompressStream.Load((char*)r.data(), r.size());
+
+ UNIT_ASSERT_VALUES_EQUAL(s, r);
+ }
+ }
+
Y_UNIT_TEST(TestSeveralStreams) {
auto s1 = GenerateRandomString(1 << 15);
auto s2 = GenerateRandomString(1 << 15);
diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp
index 54a848e912..aaec19daa6 100644
--- a/library/cpp/threading/equeue/equeue.cpp
+++ b/library/cpp/threading/equeue/equeue.cpp
@@ -1,80 +1,80 @@
-#include "equeue.h"
-
-TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
- : SlaveQueue_(std::move(slaveQueue))
-{
-}
-
-size_t TElasticQueue::ObjectCount() const {
- return (size_t)AtomicGet(ObjectCount_);
-}
-
-bool TElasticQueue::TryIncCounter() {
- if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
- AtomicDecrement(GuardCount_);
- return false;
- }
-
- return true;
-}
-
-
-
-class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue {
-public:
- TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue)
- : RealObject_(realObject)
- , Queue_(queue)
- {
- AtomicIncrement(Queue_->ObjectCount_);
- }
-
+#include "equeue.h"
+
+TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
+ : SlaveQueue_(std::move(slaveQueue))
+{
+}
+
+size_t TElasticQueue::ObjectCount() const {
+ return (size_t)AtomicGet(ObjectCount_);
+}
+
+bool TElasticQueue::TryIncCounter() {
+ if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
+ AtomicDecrement(GuardCount_);
+ return false;
+ }
+
+ return true;
+}
+
+
+
+class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue {
+public:
+ TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue)
+ : RealObject_(realObject)
+ , Queue_(queue)
+ {
+ AtomicIncrement(Queue_->ObjectCount_);
+ }
+
~TDecrementingWrapper() override {
- AtomicDecrement(Queue_->ObjectCount_);
- AtomicDecrement(Queue_->GuardCount_);
- }
-private:
- void Process(void *tsr) override {
- THolder<TDecrementingWrapper> self(this);
- RealObject_->Process(tsr);
- }
-private:
- IObjectInQueue* const RealObject_;
- TElasticQueue* const Queue_;
-};
-
-
-
-bool TElasticQueue::Add(IObjectInQueue* obj) {
- if (!TryIncCounter()) {
- return false;
- }
-
- THolder<TDecrementingWrapper> wrapper;
- try {
- wrapper.Reset(new TDecrementingWrapper(obj, this));
- } catch (...) {
- AtomicDecrement(GuardCount_);
- throw;
- }
-
- if (SlaveQueue_->Add(wrapper.Get())) {
+ AtomicDecrement(Queue_->ObjectCount_);
+ AtomicDecrement(Queue_->GuardCount_);
+ }
+private:
+ void Process(void *tsr) override {
+ THolder<TDecrementingWrapper> self(this);
+ RealObject_->Process(tsr);
+ }
+private:
+ IObjectInQueue* const RealObject_;
+ TElasticQueue* const Queue_;
+};
+
+
+
+bool TElasticQueue::Add(IObjectInQueue* obj) {
+ if (!TryIncCounter()) {
+ return false;
+ }
+
+ THolder<TDecrementingWrapper> wrapper;
+ try {
+ wrapper.Reset(new TDecrementingWrapper(obj, this));
+ } catch (...) {
+ AtomicDecrement(GuardCount_);
+ throw;
+ }
+
+ if (SlaveQueue_->Add(wrapper.Get())) {
Y_UNUSED(wrapper.Release());
- return true;
- } else {
- return false;
- }
-}
-
-void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
- MaxQueueSize_ = maxQueueSize;
- SlaveQueue_->Start(threadCount, maxQueueSize);
-}
-
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
+ MaxQueueSize_ = maxQueueSize;
+ SlaveQueue_->Start(threadCount, maxQueueSize);
+}
+
void TElasticQueue::Stop() noexcept {
- return SlaveQueue_->Stop();
-}
-
+ return SlaveQueue_->Stop();
+}
+
size_t TElasticQueue::Size() const noexcept {
- return SlaveQueue_->Size();
-}
+ return SlaveQueue_->Size();
+}
diff --git a/library/cpp/threading/equeue/equeue.h b/library/cpp/threading/equeue/equeue.h
index 40dd342585..403e993713 100644
--- a/library/cpp/threading/equeue/equeue.h
+++ b/library/cpp/threading/equeue/equeue.h
@@ -1,28 +1,28 @@
-#pragma once
-
+#pragma once
+
#include <util/thread/pool.h>
-#include <util/system/atomic.h>
-#include <util/generic/ptr.h>
-
-//actual queue limit will be (maxQueueSize - numBusyThreads) or 0
+#include <util/system/atomic.h>
+#include <util/generic/ptr.h>
+
+//actual queue limit will be (maxQueueSize - numBusyThreads) or 0
class TElasticQueue: public IThreadPool {
-public:
- explicit TElasticQueue(THolder<IThreadPool> slaveQueue);
-
- bool Add(IObjectInQueue* obj) override;
- size_t Size() const noexcept override;
-
- void Start(size_t threadCount, size_t maxQueueSize) override;
- void Stop() noexcept override;
-
- size_t ObjectCount() const;
-private:
- class TDecrementingWrapper;
-
- bool TryIncCounter();
-private:
+public:
+ explicit TElasticQueue(THolder<IThreadPool> slaveQueue);
+
+ bool Add(IObjectInQueue* obj) override;
+ size_t Size() const noexcept override;
+
+ void Start(size_t threadCount, size_t maxQueueSize) override;
+ void Stop() noexcept override;
+
+ size_t ObjectCount() const;
+private:
+ class TDecrementingWrapper;
+
+ bool TryIncCounter();
+private:
THolder<IThreadPool> SlaveQueue_;
- size_t MaxQueueSize_ = 0;
- TAtomic ObjectCount_ = 0;
- TAtomic GuardCount_ = 0;
-};
+ size_t MaxQueueSize_ = 0;
+ TAtomic ObjectCount_ = 0;
+ TAtomic GuardCount_ = 0;
+};
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 9cf2aced44..defa1a0e82 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,125 +1,125 @@
-#include "equeue.h"
-
+#include "equeue.h"
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/system/event.h>
-#include <util/datetime/base.h>
-#include <util/generic/vector.h>
-
+
+#include <util/system/event.h>
+#include <util/datetime/base.h>
+#include <util/generic/vector.h>
+
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
- const size_t MaxQueueSize = 20;
- const size_t ThreadCount = 10;
- const size_t N = 100000;
-
- static THolder<TElasticQueue> Queue;
-
- struct TQueueSetup {
- TQueueSetup() {
+ const size_t MaxQueueSize = 20;
+ const size_t ThreadCount = 10;
+ const size_t N = 100000;
+
+ static THolder<TElasticQueue> Queue;
+
+ struct TQueueSetup {
+ TQueueSetup() {
Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
- };
-
- struct TCounters {
- void Reset() {
- Processed = Scheduled = Discarded = Total = 0;
- }
-
- TAtomic Processed;
- TAtomic Scheduled;
- TAtomic Discarded;
- TAtomic Total;
- };
- static TCounters Counters;
-
-//fill test -- fill queue with "endless" jobs
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
+ };
+
+ struct TCounters {
+ void Reset() {
+ Processed = Scheduled = Discarded = Total = 0;
+ }
+
+ TAtomic Processed;
+ TAtomic Scheduled;
+ TAtomic Discarded;
+ TAtomic Total;
+ };
+ static TCounters Counters;
+
+//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
Y_UNIT_TEST(FillTest) {
- Counters.Reset();
-
- struct TWaitJob: public IObjectInQueue {
+ Counters.Reset();
+
+ struct TWaitJob: public IObjectInQueue {
void Process(void*) override {
- WaitEvent.Wait();
- AtomicIncrement(Counters.Processed);
- }
- } job;
-
- struct TLocalSetup: TQueueSetup {
- ~TLocalSetup() {
- WaitEvent.Signal();
- }
- };
-
- size_t enqueued = 0;
- {
- TLocalSetup setup;
- while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
- ++enqueued;
- }
-
- UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
- }
-
-
-//concurrent test -- send many jobs from different threads
- struct TJob: public IObjectInQueue {
+ WaitEvent.Wait();
+ AtomicIncrement(Counters.Processed);
+ }
+ } job;
+
+ struct TLocalSetup: TQueueSetup {
+ ~TLocalSetup() {
+ WaitEvent.Signal();
+ }
+ };
+
+ size_t enqueued = 0;
+ {
+ TLocalSetup setup;
+ while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ ++enqueued;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
+ }
+
+
+//concurrent test -- send many jobs from different threads
+ struct TJob: public IObjectInQueue {
void Process(void*) override {
- AtomicIncrement(Counters.Processed);
- };
- };
- static TJob Job;
-
- static bool TryAdd() {
- AtomicIncrement(Counters.Total);
- if (Queue->Add(&Job)) {
- AtomicIncrement(Counters.Scheduled);
- return true;
- } else {
- AtomicIncrement(Counters.Discarded);
- return false;
- }
- }
-
- static size_t TryCounter;
-
+ AtomicIncrement(Counters.Processed);
+ };
+ };
+ static TJob Job;
+
+ static bool TryAdd() {
+ AtomicIncrement(Counters.Total);
+ if (Queue->Add(&Job)) {
+ AtomicIncrement(Counters.Scheduled);
+ return true;
+ } else {
+ AtomicIncrement(Counters.Discarded);
+ return false;
+ }
+ }
+
+ static size_t TryCounter;
+
Y_UNIT_TEST(ConcurrentTest) {
- Counters.Reset();
- TryCounter = 0;
-
+ Counters.Reset();
+ TryCounter = 0;
+
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
- while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd()) {
- Sleep(TDuration::MicroSeconds(50));
- }
- }
- }
- } sender;
-
- {
- TQueueSetup setup;
-
+ while ((size_t)AtomicIncrement(TryCounter) <= N) {
+ if (!TryAdd()) {
+ Sleep(TDuration::MicroSeconds(50));
+ }
+ }
+ }
+ } sender;
+
+ {
+ TQueueSetup setup;
+
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
- for (size_t i = 0; i < ThreadCount; ++i) {
+ for (size_t i = 0; i < ThreadCount; ++i) {
senders.push_back(::SystemThreadFactory()->Run(&sender));
- }
-
- for (size_t i = 0; i < senders.size(); ++i) {
- senders[i]->Join();
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
- }
-}
+ }
+
+ for (size_t i = 0; i < senders.size(); ++i) {
+ senders[i]->Join();
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
+ }
+}
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp
index 174dde4bf7..407761eea7 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp
@@ -1,246 +1,246 @@
#include "task_scheduler.h"
-
-#include <util/system/thread.h>
-#include <util/string/cast.h>
+
+#include <util/system/thread.h>
+#include <util/string/cast.h>
#include <util/stream/output.h>
-
-TTaskScheduler::ITask::~ITask() {}
-TTaskScheduler::IRepeatedTask::~IRepeatedTask() {}
-
-
-
-class TTaskScheduler::TWorkerThread
+
+TTaskScheduler::ITask::~ITask() {}
+TTaskScheduler::IRepeatedTask::~IRepeatedTask() {}
+
+
+
+class TTaskScheduler::TWorkerThread
: public ISimpleThread
-{
-public:
- TWorkerThread(TTaskScheduler& state)
- : Scheduler_(state)
- {
- }
-
+{
+public:
+ TWorkerThread(TTaskScheduler& state)
+ : Scheduler_(state)
+ {
+ }
+
TString DebugState = "?";
TString DebugId = "";
-private:
+private:
void* ThreadProc() noexcept override {
- Scheduler_.WorkerFunc(this);
- return nullptr;
- }
-private:
- TTaskScheduler& Scheduler_;
-};
-
-
-
-TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount)
- : MaxTaskCount_(maxTaskCount)
-{
- for (size_t i = 0; i < threadCount; ++i) {
- Workers_.push_back(new TWorkerThread(*this));
- Workers_.back()->DebugId = ToString(i);
- }
-}
-
-TTaskScheduler::~TTaskScheduler() {
- try {
- Stop();
- } catch (...) {
+ Scheduler_.WorkerFunc(this);
+ return nullptr;
+ }
+private:
+ TTaskScheduler& Scheduler_;
+};
+
+
+
+TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount)
+ : MaxTaskCount_(maxTaskCount)
+{
+ for (size_t i = 0; i < threadCount; ++i) {
+ Workers_.push_back(new TWorkerThread(*this));
+ Workers_.back()->DebugId = ToString(i);
+ }
+}
+
+TTaskScheduler::~TTaskScheduler() {
+ try {
+ Stop();
+ } catch (...) {
Cdbg << "task scheduled destruction error: " << CurrentExceptionMessage();
- }
-}
-
-void TTaskScheduler::Start() {
- for (auto& w : Workers_) {
- w->Start();
- }
-}
-
-void TTaskScheduler::Stop() {
- with_lock (Lock_) {
- IsStopped_ = true;
- CondVar_.BroadCast();
- }
-
- for (auto& w: Workers_) {
- w->Join();
- }
-
- Workers_.clear();
- Queue_.clear();
-}
-
-size_t TTaskScheduler::GetTaskCount() const {
- return static_cast<size_t>(AtomicGet(TaskCounter_));
-}
-
-namespace {
- class TTaskWrapper
- : public TTaskScheduler::ITask
- , TNonCopyable
- {
- public:
- TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter)
- : Task_(task)
- , Counter_(counter)
- {
- AtomicIncrement(Counter_);
- }
-
+ }
+}
+
+void TTaskScheduler::Start() {
+ for (auto& w : Workers_) {
+ w->Start();
+ }
+}
+
+void TTaskScheduler::Stop() {
+ with_lock (Lock_) {
+ IsStopped_ = true;
+ CondVar_.BroadCast();
+ }
+
+ for (auto& w: Workers_) {
+ w->Join();
+ }
+
+ Workers_.clear();
+ Queue_.clear();
+}
+
+size_t TTaskScheduler::GetTaskCount() const {
+ return static_cast<size_t>(AtomicGet(TaskCounter_));
+}
+
+namespace {
+ class TTaskWrapper
+ : public TTaskScheduler::ITask
+ , TNonCopyable
+ {
+ public:
+ TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter)
+ : Task_(task)
+ , Counter_(counter)
+ {
+ AtomicIncrement(Counter_);
+ }
+
~TTaskWrapper() override {
- AtomicDecrement(Counter_);
- }
- private:
- TInstant Process() override {
- return Task_->Process();
- }
- private:
- TTaskScheduler::ITaskRef Task_;
- TAtomic& Counter_;
- };
-}
-
-bool TTaskScheduler::Add(ITaskRef task, TInstant expire) {
- with_lock (Lock_) {
- if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) {
- ITaskRef newTask = new TTaskWrapper(task, TaskCounter_);
+ AtomicDecrement(Counter_);
+ }
+ private:
+ TInstant Process() override {
+ return Task_->Process();
+ }
+ private:
+ TTaskScheduler::ITaskRef Task_;
+ TAtomic& Counter_;
+ };
+}
+
+bool TTaskScheduler::Add(ITaskRef task, TInstant expire) {
+ with_lock (Lock_) {
+ if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) {
+ ITaskRef newTask = new TTaskWrapper(task, TaskCounter_);
Queue_.insert(std::make_pair(expire, TTaskHolder(newTask)));
-
- if (!Queue_.begin()->second.WaitingWorker) {
- CondVar_.Signal();
- }
- return true;
- }
- }
-
- return false;
-}
-
-namespace {
- class TRepeatedTask
- : public TTaskScheduler::ITask
- {
- public:
- TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline)
- : Task_(task)
- , Period_(period)
- , Deadline_(deadline)
- {
- }
- private:
- TInstant Process() final {
- Deadline_ += Period_;
- if (Task_->Process()) {
- return Deadline_;
- } else {
- return TInstant::Max();
- }
- }
- private:
- TTaskScheduler::IRepeatedTaskRef Task_;
- TDuration Period_;
- TInstant Deadline_;
- };
-}
-
-bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) {
- const TInstant deadline = Now() + period;
- ITaskRef t = new TRepeatedTask(task, period, deadline);
- return Add(t, deadline);
-}
-
-
-const bool debugOutput = false;
-
+
+ if (!Queue_.begin()->second.WaitingWorker) {
+ CondVar_.Signal();
+ }
+ return true;
+ }
+ }
+
+ return false;
+}
+
+namespace {
+ class TRepeatedTask
+ : public TTaskScheduler::ITask
+ {
+ public:
+ TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline)
+ : Task_(task)
+ , Period_(period)
+ , Deadline_(deadline)
+ {
+ }
+ private:
+ TInstant Process() final {
+ Deadline_ += Period_;
+ if (Task_->Process()) {
+ return Deadline_;
+ } else {
+ return TInstant::Max();
+ }
+ }
+ private:
+ TTaskScheduler::IRepeatedTaskRef Task_;
+ TDuration Period_;
+ TInstant Deadline_;
+ };
+}
+
+bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) {
+ const TInstant deadline = Now() + period;
+ ITaskRef t = new TRepeatedTask(task, period, deadline);
+ return Add(t, deadline);
+}
+
+
+const bool debugOutput = false;
+
void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) {
- if (!debugOutput) {
+ if (!debugOutput) {
Y_UNUSED(thread);
Y_UNUSED(state);
- return;
- }
-
- thread->DebugState = state;
-
- TStringStream ss;
- ss << Now() << " " << thread->DebugId << ":\t";
- for (auto& w : Workers_) {
- ss << w->DebugState << " ";
- }
- ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl;
+ return;
+ }
+
+ thread->DebugState = state;
+
+ TStringStream ss;
+ ss << Now() << " " << thread->DebugId << ":\t";
+ for (auto& w : Workers_) {
+ ss << w->DebugState << " ";
+ }
+ ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl;
Cerr << ss.Str();
-}
-
-bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) {
- ChangeDebugState(thread, "w");
- toWait->second.WaitingWorker = thread;
- return !CondVar_.WaitD(Lock_, toWait->first);
-}
-
-void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) {
- for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) {
- if (!it->second.WaitingWorker) {
- if (toWait == Queue_.end()) {
- toWait = it;
- } else if (it->first < toWait->first) {
- toWait->second.WaitingWorker = nullptr;
- toWait = it;
- }
- break;
- }
- }
-}
-
-void TTaskScheduler::WorkerFunc(TWorkerThread* thread) {
+}
+
+bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) {
+ ChangeDebugState(thread, "w");
+ toWait->second.WaitingWorker = thread;
+ return !CondVar_.WaitD(Lock_, toWait->first);
+}
+
+void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) {
+ for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) {
+ if (!it->second.WaitingWorker) {
+ if (toWait == Queue_.end()) {
+ toWait = it;
+ } else if (it->first < toWait->first) {
+ toWait->second.WaitingWorker = nullptr;
+ toWait = it;
+ }
+ break;
+ }
+ }
+}
+
+void TTaskScheduler::WorkerFunc(TWorkerThread* thread) {
TThread::SetCurrentThreadName("TaskSchedWorker");
- TQueueIterator toWait = Queue_.end();
- ITaskRef toDo;
-
- for (;;) {
- TInstant repeat = TInstant::Max();
-
- if (!!toDo) {
- try {
- repeat = toDo->Process();
- } catch (...) {
+ TQueueIterator toWait = Queue_.end();
+ ITaskRef toDo;
+
+ for (;;) {
+ TInstant repeat = TInstant::Max();
+
+ if (!!toDo) {
+ try {
+ repeat = toDo->Process();
+ } catch (...) {
Cdbg << "task scheduler error: " << CurrentExceptionMessage();
- }
- }
-
-
- with_lock (Lock_) {
- ChangeDebugState(thread, "f");
-
- if (IsStopped_) {
- ChangeDebugState(thread, "s");
- return ;
- }
-
- if (!!toDo) {
- if (repeat < TInstant::Max()) {
+ }
+ }
+
+
+ with_lock (Lock_) {
+ ChangeDebugState(thread, "f");
+
+ if (IsStopped_) {
+ ChangeDebugState(thread, "s");
+ return ;
+ }
+
+ if (!!toDo) {
+ if (repeat < TInstant::Max()) {
Queue_.insert(std::make_pair(repeat, TTaskHolder(toDo)));
- }
- }
-
- toDo = nullptr;
-
- ChooseFromQueue(toWait);
-
- if (toWait != Queue_.end()) {
- if (toWait->first <= Now() || Wait(thread, toWait)) {
-
- toDo = toWait->second.Task;
- Queue_.erase(toWait);
- toWait = Queue_.end();
-
- if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) {
- CondVar_.Signal();
- }
-
- ChangeDebugState(thread, "p");
- }
- } else {
- ChangeDebugState(thread, "e");
- CondVar_.WaitI(Lock_);
- }
- }
- }
-}
+ }
+ }
+
+ toDo = nullptr;
+
+ ChooseFromQueue(toWait);
+
+ if (toWait != Queue_.end()) {
+ if (toWait->first <= Now() || Wait(thread, toWait)) {
+
+ toDo = toWait->second.Task;
+ Queue_.erase(toWait);
+ toWait = Queue_.end();
+
+ if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) {
+ CondVar_.Signal();
+ }
+
+ ChangeDebugState(thread, "p");
+ }
+ } else {
+ ChangeDebugState(thread, "e");
+ CondVar_.WaitI(Lock_);
+ }
+ }
+ }
+}
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h
index df4da941a8..166946a2aa 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler.h
+++ b/library/cpp/threading/task_scheduler/task_scheduler.h
@@ -1,86 +1,86 @@
-#pragma once
-
-#include <util/generic/vector.h>
-#include <util/generic/ptr.h>
-#include <util/generic/map.h>
-
-#include <util/datetime/base.h>
-
-#include <util/system/condvar.h>
-#include <util/system/mutex.h>
-
-class TTaskScheduler {
-public:
- class ITask;
- using ITaskRef = TIntrusivePtr<ITask>;
-
- class IRepeatedTask;
- using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>;
-public:
- explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>());
- ~TTaskScheduler();
-
- void Start();
- void Stop();
-
- bool Add(ITaskRef task, TInstant expire);
- bool Add(IRepeatedTaskRef task, TDuration period);
-
- size_t GetTaskCount() const;
-private:
- class TWorkerThread;
-
- struct TTaskHolder {
- explicit TTaskHolder(ITaskRef& task)
- : Task(task)
- {
- }
- public:
- ITaskRef Task;
- TWorkerThread* WaitingWorker = nullptr;
- };
-
+#pragma once
+
+#include <util/generic/vector.h>
+#include <util/generic/ptr.h>
+#include <util/generic/map.h>
+
+#include <util/datetime/base.h>
+
+#include <util/system/condvar.h>
+#include <util/system/mutex.h>
+
+class TTaskScheduler {
+public:
+ class ITask;
+ using ITaskRef = TIntrusivePtr<ITask>;
+
+ class IRepeatedTask;
+ using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>;
+public:
+ explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>());
+ ~TTaskScheduler();
+
+ void Start();
+ void Stop();
+
+ bool Add(ITaskRef task, TInstant expire);
+ bool Add(IRepeatedTaskRef task, TDuration period);
+
+ size_t GetTaskCount() const;
+private:
+ class TWorkerThread;
+
+ struct TTaskHolder {
+ explicit TTaskHolder(ITaskRef& task)
+ : Task(task)
+ {
+ }
+ public:
+ ITaskRef Task;
+ TWorkerThread* WaitingWorker = nullptr;
+ };
+
using TQueueType = TMultiMap<TInstant, TTaskHolder>;
using TQueueIterator = TQueueType::iterator;
-private:
+private:
void ChangeDebugState(TWorkerThread* thread, const TString& state);
- void ChooseFromQueue(TQueueIterator& toWait);
- bool Wait(TWorkerThread* thread, TQueueIterator& toWait);
-
- void WorkerFunc(TWorkerThread* thread);
-private:
- bool IsStopped_ = false;
-
- TAtomic TaskCounter_ = 0;
+ void ChooseFromQueue(TQueueIterator& toWait);
+ bool Wait(TWorkerThread* thread, TQueueIterator& toWait);
+
+ void WorkerFunc(TWorkerThread* thread);
+private:
+ bool IsStopped_ = false;
+
+ TAtomic TaskCounter_ = 0;
TQueueType Queue_;
-
- TCondVar CondVar_;
- TMutex Lock_;
-
+
+ TCondVar CondVar_;
+ TMutex Lock_;
+
TVector<TAutoPtr<TWorkerThread>> Workers_;
-
- const size_t MaxTaskCount_;
-};
-
-class TTaskScheduler::ITask
- : public TAtomicRefCount<ITask>
-{
-public:
- virtual ~ITask();
-
- virtual TInstant Process() {//returns time to repeat this task
- return TInstant::Max();
- }
-};
-
-class TTaskScheduler::IRepeatedTask
- : public TAtomicRefCount<IRepeatedTask>
-{
-public:
- virtual ~IRepeatedTask();
-
- virtual bool Process() {//returns if to repeat task again
- return false;
- }
-};
-
+
+ const size_t MaxTaskCount_;
+};
+
+class TTaskScheduler::ITask
+ : public TAtomicRefCount<ITask>
+{
+public:
+ virtual ~ITask();
+
+ virtual TInstant Process() {//returns time to repeat this task
+ return TInstant::Max();
+ }
+};
+
+class TTaskScheduler::IRepeatedTask
+ : public TAtomicRefCount<IRepeatedTask>
+{
+public:
+ virtual ~IRepeatedTask();
+
+ virtual bool Process() {//returns if to repeat task again
+ return false;
+ }
+};
+
diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
index 3b5203194a..cb2daa718b 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/stream/output.h>
-#include <util/system/atomic.h>
+#include <util/system/atomic.h>
#include <util/generic/vector.h>
#include "task_scheduler.h"
@@ -12,13 +12,13 @@ class TTaskSchedulerTest: public TTestBase {
UNIT_TEST(Test);
UNIT_TEST_SUITE_END();
- class TCheckTask: public TTaskScheduler::IRepeatedTask {
+ class TCheckTask: public TTaskScheduler::IRepeatedTask {
public:
TCheckTask(const TDuration& delay)
: Start_(Now())
, Delay_(delay)
{
- AtomicIncrement(ScheduledTaskCounter_);
+ AtomicIncrement(ScheduledTaskCounter_);
}
~TCheckTask() override {
@@ -28,28 +28,28 @@ class TTaskSchedulerTest: public TTestBase {
const TDuration delay = Now() - Start_;
if (delay < Delay_) {
- AtomicIncrement(BadTimeoutCounter_);
+ AtomicIncrement(BadTimeoutCounter_);
}
- AtomicIncrement(ExecutedTaskCounter_);
+ AtomicIncrement(ExecutedTaskCounter_);
return false;
}
static bool AllTaskExecuted() {
- return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_);
+ return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_);
}
static size_t BadTimeoutCount() {
- return AtomicGet(BadTimeoutCounter_);
+ return AtomicGet(BadTimeoutCounter_);
}
private:
TInstant Start_;
TDuration Delay_;
- static TAtomic BadTimeoutCounter_;
- static TAtomic ScheduledTaskCounter_;
- static TAtomic ExecutedTaskCounter_;
+ static TAtomic BadTimeoutCounter_;
+ static TAtomic ScheduledTaskCounter_;
+ static TAtomic ExecutedTaskCounter_;
};
public:
@@ -72,15 +72,15 @@ class TTaskSchedulerTest: public TTestBase {
void ScheduleCheckTask(size_t delay) {
TDuration d = TDuration::MicroSeconds(delay);
- Scheduler_.Add(new TCheckTask(d), d);
+ Scheduler_.Add(new TCheckTask(d), d);
}
private:
- TTaskScheduler Scheduler_;
+ TTaskScheduler Scheduler_;
};
-TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0;
-TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0;
-TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0;
+TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0;
+TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0;
+TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0;
UNIT_TEST_SUITE_REGISTRATION(TTaskSchedulerTest);