diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-02-10 16:46:54 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:54 +0300 |
commit | de20f5598f0832a6e646f61b4feca942c00da928 (patch) | |
tree | 6de57350f1f78bcbe9c57e73a010cd24a6afc90e /library/cpp/actors | |
parent | 9eeddfb447d62493b7f67a7a1e253ea7f28e95ae (diff) | |
download | ydb-de20f5598f0832a6e646f61b4feca942c00da928.tar.gz |
Restoring authorship annotation for Vladislav Kuznetsov <va.kuznecov@physics.msu.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r-- | library/cpp/actors/core/actorsystem.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/probes.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/scheduler_actor.cpp | 248 | ||||
-rw-r--r-- | library/cpp/actors/core/scheduler_actor.h | 38 | ||||
-rw-r--r-- | library/cpp/actors/core/scheduler_actor_ut.cpp | 166 | ||||
-rw-r--r-- | library/cpp/actors/core/scheduler_basic.cpp | 44 | ||||
-rw-r--r-- | library/cpp/actors/core/scheduler_basic.h | 46 | ||||
-rw-r--r-- | library/cpp/actors/core/ut/ya.make | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/ya.make | 4 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/events_local.h | 116 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor.h | 8 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/util/rope.h | 52 |
15 files changed, 371 insertions, 371 deletions
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index c58698a206..df006ba7b8 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -7,7 +7,7 @@ #include "interconnect.h" #include "servicemap.h" #include "scheduler_queue.h" -#include "scheduler_actor.h" +#include "scheduler_actor.h" #include "log.h" #include "probes.h" #include "ask.h" diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 446b651efd..d4aac6e60e 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -20,7 +20,7 @@ #endif #include <util/system/type_name.h> -#include <util/system/datetime.h> +#include <util/system/datetime.h> LWTRACE_USING(ACTORLIB_PROVIDER) diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 4912d6dd26..0a7804c210 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -71,9 +71,9 @@ PROBE(SlowICDropConfirmed, GROUPS("ActorLibSlowIC"), \ TYPES(ui32, double), \ NAMES("peerId", "icDropConfirmedMs")) \ - PROBE(ActorsystemScheduler, GROUPS("Durations"), \ - TYPES(ui64, ui64, ui32, ui32, ui64, ui64), \ - NAMES("timeUs", "timerfd_expirations", "eventsGottenFromQueues", "eventsSent", \ + PROBE(ActorsystemScheduler, GROUPS("Durations"), \ + TYPES(ui64, ui64, ui32, ui32, ui64, ui64), \ + NAMES("timeUs", "timerfd_expirations", "eventsGottenFromQueues", "eventsSent", \ "eventsInSendQueue", "eventSchedulingErrorUs")) \ PROBE(ForwardEvent, GROUPS("Orbit", "InterconnectSessionTCP"), \ TYPES(ui32, ui32, ui32, LWTYPE_ACTORID, LWTYPE_ACTORID, ui64, ui32), \ diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp index febc5e40dd..cccb302c1f 100644 --- a/library/cpp/actors/core/scheduler_actor.cpp +++ b/library/cpp/actors/core/scheduler_actor.cpp @@ -1,135 +1,135 @@ -#include "actor_bootstrapped.h" -#include "hfunc.h" -#include "probes.h" -#include "scheduler_actor.h" -#include "scheduler_queue.h" - +#include "actor_bootstrapped.h" +#include "hfunc.h" +#include "probes.h" +#include "scheduler_actor.h" +#include "scheduler_queue.h" + #include <library/cpp/actors/interconnect/poller_actor.h> -#include <util/system/hp_timer.h> - -#ifdef __linux__ +#include <util/system/hp_timer.h> + +#ifdef __linux__ #include <sys/timerfd.h> #include <errno.h> - -LWTRACE_USING(ACTORLIB_PROVIDER); - -namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +namespace NActors { class TTimerDescriptor: public TSharedDescriptor { - const int Descriptor; + const int Descriptor; - public: - TTimerDescriptor() + public: + TTimerDescriptor() : Descriptor(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) - { - Y_VERIFY(Descriptor != -1, "timerfd_create() failed with %s", strerror(errno)); - } - + { + Y_VERIFY(Descriptor != -1, "timerfd_create() failed with %s", strerror(errno)); + } + ~TTimerDescriptor() override { - close(Descriptor); - } - + close(Descriptor); + } + int GetDescriptor() override { - return Descriptor; - } - }; - + return Descriptor; + } + }; + class TSchedulerActor: public TActor<TSchedulerActor> { - const TSchedulerConfig Cfg; - TIntrusivePtr<TSharedDescriptor> TimerDescriptor; - - TVector<NSchedulerQueue::TReader*> Readers; - + const TSchedulerConfig Cfg; + TIntrusivePtr<TSharedDescriptor> TimerDescriptor; + + TVector<NSchedulerQueue::TReader*> Readers; + TActorId PollerActor; TPollerToken::TPtr PollerToken; - - ui64 RealTime; + + ui64 RealTime; ui64 MonotonicTime; - - ui64 ActiveTick; - typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues + + ui64 ActiveTick; + typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues typedef THashMap<ui64, TAutoPtr<TMomentMap>> TScheduleMap; // over-second schedule - - TScheduleMap ScheduleMap; - - THolder<NThreading::TLegacyFuture<void, false>> MainCycle; - - static const ui64 IntrasecondThreshold = 1048576; // ~second - TAutoPtr<TMomentMap> ActiveSec; + + TScheduleMap ScheduleMap; + + THolder<NThreading::TLegacyFuture<void, false>> MainCycle; + + static const ui64 IntrasecondThreshold = 1048576; // ~second + TAutoPtr<TMomentMap> ActiveSec; volatile ui64* CurrentTimestamp = nullptr; volatile ui64* CurrentMonotonic = nullptr; - TDeque<TAutoPtr<IEventHandle>> EventsToBeSent; - - public: + TDeque<TAutoPtr<IEventHandle>> EventsToBeSent; + + public: static constexpr IActor::EActivityType ActorActivityType() { return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR; } TSchedulerActor(const TSchedulerConfig& cfg) - : TActor(&TSchedulerActor::StateFunc) - , Cfg(cfg) - , TimerDescriptor(new TTimerDescriptor()) + : TActor(&TSchedulerActor::StateFunc) + , Cfg(cfg) + , TimerDescriptor(new TTimerDescriptor()) , PollerActor(MakePollerActorId()) - { - Y_ASSERT(Cfg.ResolutionMicroseconds != 0); - Y_ASSERT(Cfg.ProgressThreshold != 0); - Become(&TSchedulerActor::StateFunc); - } - + { + Y_ASSERT(Cfg.ResolutionMicroseconds != 0); + Y_ASSERT(Cfg.ProgressThreshold != 0); + Become(&TSchedulerActor::StateFunc); + } + void Handle(TEvSchedulerInitialize::TPtr& ev, const TActorContext& ctx) { const TEvSchedulerInitialize& evInitialize = *ev->Get(); - Y_ASSERT(evInitialize.ScheduleReaders.size() != 0); - Readers.resize(evInitialize.ScheduleReaders.size()); - Copy(evInitialize.ScheduleReaders.begin(), evInitialize.ScheduleReaders.end(), Readers.begin()); - - Y_ASSERT(evInitialize.CurrentTimestamp != nullptr); - CurrentTimestamp = evInitialize.CurrentTimestamp; - + Y_ASSERT(evInitialize.ScheduleReaders.size() != 0); + Readers.resize(evInitialize.ScheduleReaders.size()); + Copy(evInitialize.ScheduleReaders.begin(), evInitialize.ScheduleReaders.end(), Readers.begin()); + + Y_ASSERT(evInitialize.CurrentTimestamp != nullptr); + CurrentTimestamp = evInitialize.CurrentTimestamp; + Y_ASSERT(evInitialize.CurrentMonotonic != nullptr); CurrentMonotonic = evInitialize.CurrentMonotonic; - struct itimerspec new_time; - memset(&new_time, 0, sizeof(new_time)); - new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000; - new_time.it_interval.tv_nsec = Cfg.ResolutionMicroseconds * 1000; - int ret = timerfd_settime(TimerDescriptor->GetDescriptor(), 0, &new_time, NULL); - Y_VERIFY(ret != -1, "timerfd_settime() failed with %s", strerror(errno)); + struct itimerspec new_time; + memset(&new_time, 0, sizeof(new_time)); + new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000; + new_time.it_interval.tv_nsec = Cfg.ResolutionMicroseconds * 1000; + int ret = timerfd_settime(TimerDescriptor->GetDescriptor(), 0, &new_time, NULL); + Y_VERIFY(ret != -1, "timerfd_settime() failed with %s", strerror(errno)); const bool success = ctx.Send(PollerActor, new TEvPollerRegister(TimerDescriptor, SelfId(), {})); Y_VERIFY(success); - + RealTime = RelaxedLoad(CurrentTimestamp); MonotonicTime = RelaxedLoad(CurrentMonotonic); - + ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold); - } - + } + void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx) { PollerToken = ev->Get()->PollerToken; HandleSchedule(ctx); } - void UpdateTime() { + void UpdateTime() { RealTime = TInstant::Now().MicroSeconds(); MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds()); AtomicStore(CurrentTimestamp, RealTime); AtomicStore(CurrentMonotonic, MonotonicTime); - } - + } + void TryUpdateTime(NHPTimer::STime* lastTimeUpdate) { - NHPTimer::STime hpnow; + NHPTimer::STime hpnow; GetTimeFast(&hpnow); - const ui64 elapsedCycles = hpnow > *lastTimeUpdate ? hpnow - *lastTimeUpdate : 0; - if (elapsedCycles > Cfg.ResolutionMicroseconds * (NHPTimer::GetCyclesPerSecond() / IntrasecondThreshold)) { - UpdateTime(); + const ui64 elapsedCycles = hpnow > *lastTimeUpdate ? hpnow - *lastTimeUpdate : 0; + if (elapsedCycles > Cfg.ResolutionMicroseconds * (NHPTimer::GetCyclesPerSecond() / IntrasecondThreshold)) { + UpdateTime(); GetTimeFast(lastTimeUpdate); - } - } - - void HandleSchedule(const TActorContext& ctx) { + } + } + + void HandleSchedule(const TActorContext& ctx) { for (;;) { NHPTimer::STime schedulingStart; GetTimeFast(&schedulingStart); NHPTimer::STime lastTimeUpdate = schedulingStart; - + ui64 expired; ssize_t bytesRead; bytesRead = read(TimerDescriptor->GetDescriptor(), &expired, sizeof(expired)); @@ -143,7 +143,7 @@ namespace NActors { } Y_VERIFY(bytesRead == sizeof(expired), "Error while reading from timerfd, strerror# %s", strerror(errno)); UpdateTime(); - + ui32 eventsGottenFromQueues = 0; // collect everything from queues for (ui32 i = 0; i != Readers.size(); ++i) { @@ -151,9 +151,9 @@ namespace NActors { const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Cfg.ResolutionMicroseconds); IEventHandle* const ev = x->Ev; ISchedulerCookie* const cookie = x->Cookie; - + // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save - + if (instant <= ActiveTick) { if (!ActiveSec) ActiveSec.Reset(new TMomentMap()); @@ -173,9 +173,9 @@ namespace NActors { } ++eventsGottenFromQueues; TryUpdateTime(&lastTimeUpdate); - } - } - + } + } + ui64 eventSchedulingErrorUs = 0; // send everything triggered on schedule for (;;) { @@ -197,17 +197,17 @@ namespace NActors { delete ev; } } else { - EventsToBeSent.push_back(ev); - } + EventsToBeSent.push_back(ev); + } TryUpdateTime(&lastTimeUpdate); - } - } + } + } ActiveSec->erase(it); } else { break; - } - } - + } + } + if (ActiveTick <= MonotonicTime) { Y_VERIFY_DEBUG(!ActiveSec || ActiveSec->empty()); ActiveSec.Destroy(); @@ -218,12 +218,12 @@ namespace NActors { ScheduleMap.erase(it); } continue; - } + } // ok, if we are here - then nothing is ready, so send step complete break; - } - + } + // Send all from buffer queue const ui64 eventsToBeSentSize = EventsToBeSent.size(); ui32 sentCount = 0; @@ -237,7 +237,7 @@ namespace NActors { ctx.Send(EventsToBeSent.front()); EventsToBeSent.pop_front(); } - + NHPTimer::STime hpnow; GetTimeFast(&hpnow); const ui64 processingTime = hpnow > schedulingStart ? hpnow - schedulingStart : 0; @@ -245,35 +245,35 @@ namespace NActors { LWPROBE(ActorsystemScheduler, elapsedTimeMicroseconds, expired, eventsGottenFromQueues, sentCount, eventsToBeSentSize, eventSchedulingErrorUs); TryUpdateTime(&lastTimeUpdate); - } - } - + } + } + STRICT_STFUNC(StateFunc, HFunc(TEvSchedulerInitialize, Handle) CFunc(TEvPollerReady::EventType, HandleSchedule) CFunc(TEvents::TSystem::PoisonPill, Die) HFunc(TEvPollerRegisterResult, Handle) ) - }; - + }; + IActor* CreateSchedulerActor(const TSchedulerConfig& cfg) { - if (cfg.UseSchedulerActor) { - return new TSchedulerActor(cfg); - } else { - return nullptr; - } - } - + if (cfg.UseSchedulerActor) { + return new TSchedulerActor(cfg); + } else { + return nullptr; + } + } + } - -#else // linux - -namespace NActors { + +#else // linux + +namespace NActors { IActor* CreateSchedulerActor(const TSchedulerConfig& cfg) { - Y_UNUSED(cfg); - return nullptr; - } - + Y_UNUSED(cfg); + return nullptr; + } + } - -#endif // linux + +#endif // linux diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h index c2c561b43d..5aa9f0216d 100644 --- a/library/cpp/actors/core/scheduler_actor.h +++ b/library/cpp/actors/core/scheduler_actor.h @@ -1,29 +1,29 @@ -#pragma once - -#include "actor.h" -#include "event_local.h" -#include "events.h" -#include "scheduler_basic.h" - -namespace NActors { - struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> { - TVector<NSchedulerQueue::TReader*> ScheduleReaders; +#pragma once + +#include "actor.h" +#include "event_local.h" +#include "events.h" +#include "scheduler_basic.h" + +namespace NActors { + struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> { + TVector<NSchedulerQueue::TReader*> ScheduleReaders; volatile ui64* CurrentTimestamp; volatile ui64* CurrentMonotonic; - + TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) - : ScheduleReaders(scheduleReaders) - , CurrentTimestamp(currentTimestamp) + : ScheduleReaders(scheduleReaders) + , CurrentTimestamp(currentTimestamp) , CurrentMonotonic(currentMonotonic) { } - }; - + }; + IActor* CreateSchedulerActor(const TSchedulerConfig& cfg); - + inline TActorId MakeSchedulerActorId() { - char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'}; + char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'}; return TActorId(0, TStringBuf(x, 12)); - } - + } + } diff --git a/library/cpp/actors/core/scheduler_actor_ut.cpp b/library/cpp/actors/core/scheduler_actor_ut.cpp index 09b7369d36..8f5cf6b23f 100644 --- a/library/cpp/actors/core/scheduler_actor_ut.cpp +++ b/library/cpp/actors/core/scheduler_actor_ut.cpp @@ -1,100 +1,100 @@ -#include "actor_coroutine.h" -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "scheduler_actor.h" -#include "scheduler_basic.h" -#include "events.h" -#include "event_local.h" -#include "hfunc.h" +#include "actor_coroutine.h" +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "scheduler_actor.h" +#include "scheduler_basic.h" +#include "events.h" +#include "event_local.h" +#include "hfunc.h" #include <library/cpp/actors/interconnect/poller_actor.h> #include <library/cpp/testing/unittest/registar.h> - -#include <util/system/sanitizers.h> - -using namespace NActors; - + +#include <util/system/sanitizers.h> + +using namespace NActors; + Y_UNIT_TEST_SUITE(SchedulerActor) { - class TTestActor: public TActorBootstrapped<TTestActor> { - TManualEvent& DoneEvent; - TAtomic& EventsProcessed; - TInstant LastWakeup; - const TAtomicBase EventsTotalCount; - const TDuration ScheduleDelta; - - public: - TTestActor(TManualEvent& doneEvent, TAtomic& eventsProcessed, TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) - : DoneEvent(doneEvent) - , EventsProcessed(eventsProcessed) - , EventsTotalCount(eventsTotalCount) - , ScheduleDelta(TDuration::MilliSeconds(scheduleDeltaMs)) - { - } - + class TTestActor: public TActorBootstrapped<TTestActor> { + TManualEvent& DoneEvent; + TAtomic& EventsProcessed; + TInstant LastWakeup; + const TAtomicBase EventsTotalCount; + const TDuration ScheduleDelta; + + public: + TTestActor(TManualEvent& doneEvent, TAtomic& eventsProcessed, TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) + : DoneEvent(doneEvent) + , EventsProcessed(eventsProcessed) + , EventsTotalCount(eventsTotalCount) + , ScheduleDelta(TDuration::MilliSeconds(scheduleDeltaMs)) + { + } + void Bootstrap(const TActorContext& ctx) { - LastWakeup = ctx.Now(); - Become(&TThis::StateFunc); - ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup()); - } - + LastWakeup = ctx.Now(); + Become(&TThis::StateFunc); + ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup()); + } + void Handle(TEvents::TEvWakeup::TPtr& /*ev*/, const TActorContext& ctx) { - const TInstant now = ctx.Now(); - UNIT_ASSERT(now - LastWakeup >= ScheduleDelta); - LastWakeup = now; - - if (AtomicIncrement(EventsProcessed) == EventsTotalCount) { - DoneEvent.Signal(); - } else { - ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup()); - } - } - + const TInstant now = ctx.Now(); + UNIT_ASSERT(now - LastWakeup >= ScheduleDelta); + LastWakeup = now; + + if (AtomicIncrement(EventsProcessed) == EventsTotalCount) { + DoneEvent.Signal(); + } else { + ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup()); + } + } + STRICT_STFUNC(StateFunc, {HFunc(TEvents::TEvWakeup, Handle)}) - }; - - void Test(TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) { + }; + + void Test(TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) { THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>(); - setup->NodeId = 0; - setup->ExecutorsCount = 1; + setup->NodeId = 0; + setup->ExecutorsCount = 1; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic"); - } - // create poller actor (whether platform supports it) + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic"); + } + // create poller actor (whether platform supports it) TActorId pollerActorId; if (IActor* poller = CreatePollerActor()) { pollerActorId = MakePollerActorId(); - setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0)); - } + setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0)); + } TActorId schedulerActorId; if (IActor* schedulerActor = CreateSchedulerActor(TSchedulerConfig())) { schedulerActorId = MakeSchedulerActorId(); - setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0)); - } - setup->Scheduler = CreateSchedulerThread(TSchedulerConfig()); - - TActorSystem actorSystem(setup); - - actorSystem.Start(); - - TManualEvent doneEvent; - TAtomic eventsProcessed = 0; - actorSystem.Register(new TTestActor(doneEvent, eventsProcessed, eventsTotalCount, scheduleDeltaMs)); - doneEvent.WaitI(); - - UNIT_ASSERT(AtomicGet(eventsProcessed) == eventsTotalCount); - - actorSystem.Stop(); - } - + setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0)); + } + setup->Scheduler = CreateSchedulerThread(TSchedulerConfig()); + + TActorSystem actorSystem(setup); + + actorSystem.Start(); + + TManualEvent doneEvent; + TAtomic eventsProcessed = 0; + actorSystem.Register(new TTestActor(doneEvent, eventsProcessed, eventsTotalCount, scheduleDeltaMs)); + doneEvent.WaitI(); + + UNIT_ASSERT(AtomicGet(eventsProcessed) == eventsTotalCount); + + actorSystem.Stop(); + } + Y_UNIT_TEST(LongEvents) { - Test(10, 500); - } - + Test(10, 500); + } + Y_UNIT_TEST(MediumEvents) { - Test(100, 50); - } - + Test(100, 50); + } + Y_UNIT_TEST(QuickEvents) { - Test(1000, 5); - } -} + Test(1000, 5); + } +} diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp index fba200e16b..ab5919c15f 100644 --- a/library/cpp/actors/core/scheduler_basic.cpp +++ b/library/cpp/actors/core/scheduler_basic.cpp @@ -42,7 +42,7 @@ namespace NActors { , StopFlag(false) , ScheduleMap(3600) { - Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true"); + Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true"); } TBasicSchedulerThread::~TBasicSchedulerThread() { @@ -247,28 +247,28 @@ namespace NActors { MainCycle->Get(); MainCycle.Destroy(); } - + } - -#ifdef __linux__ - -namespace NActors { - ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { - if (config.UseSchedulerActor) { - return new TMockSchedulerThread(); - } else { - return new TBasicSchedulerThread(config); - } - } - + +#ifdef __linux__ + +namespace NActors { + ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { + if (config.UseSchedulerActor) { + return new TMockSchedulerThread(); + } else { + return new TBasicSchedulerThread(config); + } + } + } - -#else // __linux__ - -namespace NActors { + +#else // __linux__ + +namespace NActors { ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { - return new TBasicSchedulerThread(config); - } + return new TBasicSchedulerThread(config); + } } - -#endif // __linux__ + +#endif // __linux__ diff --git a/library/cpp/actors/core/scheduler_basic.h b/library/cpp/actors/core/scheduler_basic.h index 2ccde39235..043cc5257d 100644 --- a/library/cpp/actors/core/scheduler_basic.h +++ b/library/cpp/actors/core/scheduler_basic.h @@ -49,33 +49,33 @@ namespace NActors { void PrepareStop() override; void Stop() override; }; - + class TMockSchedulerThread: public ISchedulerThread { - public: - virtual ~TMockSchedulerThread() override { - } - + public: + virtual ~TMockSchedulerThread() override { + } + void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override { - Y_UNUSED(actorSystem); + Y_UNUSED(actorSystem); *currentTimestamp = TInstant::Now().MicroSeconds(); *currentMonotonic = GetMonotonicMicroSeconds(); - } - + } + void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override { - Y_UNUSED(readers); - Y_UNUSED(scheduleReadersCount); - } - - void Start() override { - } - - void PrepareStop() override { - } - - void Stop() override { - } - }; - + Y_UNUSED(readers); + Y_UNUSED(scheduleReadersCount); + } + + void Start() override { + } + + void PrepareStop() override { + } + + void Stop() override { + } + }; + ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& cfg); - + } diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make index 3ee28d5850..50d885b1b3 100644 --- a/library/cpp/actors/core/ut/ya.make +++ b/library/cpp/actors/core/ut/ya.make @@ -23,11 +23,11 @@ ELSE() ENDIF() -PEERDIR( +PEERDIR( library/cpp/actors/interconnect library/cpp/actors/testlib -) - +) + SRCS( actor_coroutine_ut.cpp actor_ut.cpp diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 880a9d00db..a583363523 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -89,8 +89,8 @@ SRCS( probes.h process_stats.cpp process_stats.h - scheduler_actor.cpp - scheduler_actor.h + scheduler_actor.cpp + scheduler_actor.h scheduler_basic.cpp scheduler_basic.h scheduler_cookie.cpp diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535..fa1054be14 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -1,27 +1,27 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/protos/interconnect.pb.h> -#include <util/generic/deque.h> -#include <util/network/address.h> - -#include "interconnect_stream.h" -#include "packet.h" +#include <util/generic/deque.h> +#include <util/network/address.h> + +#include "interconnect_stream.h" +#include "packet.h" #include "types.h" - -namespace NActors { + +namespace NActors { struct TProgramInfo { ui64 PID = 0; ui64 StartTime = 0; ui64 Serial = 0; }; - + enum class ENetwork : ui32 { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // local messages //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - + Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP), SocketReadyRead = Start, @@ -71,11 +71,11 @@ namespace NActors { struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead") }; - + struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite") }; - + struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error)) TString GetReason() const { @@ -83,18 +83,18 @@ namespace NActors { } const int Error; TIntrusivePtr<NInterconnect::TStreamSocket> Socket; - + TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock) : Error(error) , Socket(std::move(sock)) { } }; - + struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect") }; - + struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect") TDisconnectReason Reason; @@ -104,7 +104,7 @@ namespace NActors { { } }; - + struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, @@ -119,21 +119,21 @@ namespace NActors { const TActorId Peer; const ui64 Counter; }; - + struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck") - + TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) : Self(self) , NextPacket(nextPacket) , Params(std::move(params)) {} - + const TActorId Self; const ui64 NextPacket; const TSessionParams Params; }; - + struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak") }; @@ -143,32 +143,32 @@ namespace NActors { ui32(ENetwork::HandshakeRequest)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest, "Network: TEvHandshakeRequest") - + NActorsInterconnect::THandshakeRequest Record; }; - + struct TEvHandshakeReplyOK : public TEventLocal<TEvHandshakeReplyOK, ui32(ENetwork::HandshakeReplyOK)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK, "Network: TEvHandshakeReplyOK") - + NActorsInterconnect::THandshakeReply Record; }; - + struct TEvHandshakeReplyError : public TEventLocal<TEvHandshakeReplyError, ui32(ENetwork::HandshakeReplyError)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError, "Network: TEvHandshakeReplyError") - + TEvHandshakeReplyError(TString error) { Record.SetErrorExplaination(error); } - + NActorsInterconnect::THandshakeReply Record; }; - + struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection") TIntrusivePtr<NInterconnect::TStreamSocket> Socket; @@ -179,10 +179,10 @@ namespace NActors { , Address(std::move(address)) {} }; - + struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone") - + TEvHandshakeDone( TIntrusivePtr<NInterconnect::TStreamSocket> socket, const TActorId& peer, @@ -198,7 +198,7 @@ namespace NActors { , Params(std::move(params)) { } - + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; const TActorId Peer; const TActorId Self; @@ -206,10 +206,10 @@ namespace NActors { TAutoPtr<TProgramInfo> ProgramInfo; const TSessionParams Params; }; - + struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail") - + enum EnumHandshakeFail { HANDSHAKE_FAIL_TRANSIENT, HANDSHAKE_FAIL_PERMANENT, @@ -224,58 +224,58 @@ namespace NActors { const EnumHandshakeFail Temporary; const TString Explanation; - }; - + }; + struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick") }; - + struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush") }; - + struct TEvLocalNodeInfo : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo") - + ui32 NodeId; NAddr::IRemoteAddrPtr Address; }; - + struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy, "Network: TEvBunchOfEventsToDestroy") - + TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events) : Events(std::move(events)) { } - + TDeque<TAutoPtr<IEventBase>> Events; }; - + struct TEvResolveAddress : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress") - + TString Address; ui16 Port; }; - + struct TEvAddressInfo : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo") - + NAddr::IRemoteAddrPtr Address; }; - + struct TEvResolveError : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError") - + TString Explain; }; - + struct TEvHTTPStreamStatus : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus, @@ -285,38 +285,38 @@ namespace NActors { COMPLETE, ERROR, }; - + EStatus Status; TString Error; TString HttpHeaders; - }; - + }; + struct TEvHTTPSendContent : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent") - + const char* Data; size_t Len; bool Last; }; - + struct TEvConnectWakeup : public TEventLocal<TEvConnectWakeup, ui32(ENetwork::ConnectProtocolWakeup)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup") }; - + struct TEvHTTPProtocolRetry : public TEventLocal<TEvHTTPProtocolRetry, ui32(ENetwork::HTTPProtocolRetry)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry, "Protocols: TEvHTTPProtocolRetry") }; - + struct TEvLoadMessage : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> { TEvLoadMessage() = default; - + template <typename TContainer> TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) { for (const TActorId& actorId : route) { @@ -329,7 +329,7 @@ namespace NActors { if (payload) { Record.SetPayload(*payload); } - } + } template <typename TContainer> TEvLoadMessage(const TContainer& route, const TString& id, TRope&& payload) { @@ -343,7 +343,7 @@ namespace NActors { AddPayload(std::move(payload)); } }; - + struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> { ui64 ConfirmedByInput; // latest Confirm value from processed input packet ui64 NumDataBytes; diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index f927b82089..dd787518e5 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -1,6 +1,6 @@ #pragma once -#include "events_local.h" +#include "events_local.h" #include "poller.h" #include <library/cpp/actors/core/actor.h> @@ -56,8 +56,8 @@ namespace NActors { IActor* CreatePollerActor(); inline TActorId MakePollerActorId() { - char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'}; + char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'}; return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } - + } + } diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0..d6d9c3d8da 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -16,7 +16,7 @@ SRCS( channel_scheduler.h event_filter.h event_holder_pool.h - events_local.h + events_local.h interconnect_address.cpp interconnect_address.h interconnect_channel.cpp diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 6fa25b9965..2be70b75a5 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -774,7 +774,7 @@ namespace NActors { } void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { - Y_VERIFY(!IsInitialized); + Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); LogBackend = logBackend; } diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index f5595efbaa..82b407a787 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -4,8 +4,8 @@ #include <util/generic/string.h> #include <util/generic/hash_set.h> #include <util/stream/str.h> -#include <util/system/sanitizers.h> -#include <util/system/valgrind.h> +#include <util/system/sanitizers.h> +#include <util/system/valgrind.h> // exactly one of them must be included #include "rope_cont_list.h" @@ -1135,27 +1135,27 @@ inline TRope TRope::CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, T return res; } - -#if defined(WITH_VALGRIND) || defined(_msan_enabled_) - -inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) { - while (size) { - ui64 contiguousSize = Min(size, begin.ContiguousSize()); -# if defined(WITH_VALGRIND) - VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize); -# endif -# if defined(_msan_enabled_) - NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize); -# endif - size -= contiguousSize; - begin += contiguousSize; - } -} - -# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size) - -#else - -# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false) - -#endif + +#if defined(WITH_VALGRIND) || defined(_msan_enabled_) + +inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) { + while (size) { + ui64 contiguousSize = Min(size, begin.ContiguousSize()); +# if defined(WITH_VALGRIND) + VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize); +# endif +# if defined(_msan_enabled_) + NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize); +# endif + size -= contiguousSize; + begin += contiguousSize; + } +} + +# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size) + +#else + +# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false) + +#endif |