diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
commit | 330c83f8c116bd45316397b179275e9d87007e7d (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/core | |
parent | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff) | |
download | ydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/core')
30 files changed, 526 insertions, 526 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 638bfb72fa..6f9ba6a42b 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -37,10 +37,10 @@ namespace NActors { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } - void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { - TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); - } - + void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); + } + void TActivationContext::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie); } @@ -87,14 +87,14 @@ namespace NActors { return TlsActivationContext->ExecutorThread.ActorSystem; } - i64 TActivationContext::GetCurrentEventTicks() { + i64 TActivationContext::GetCurrentEventTicks() { return GetCycleCountFast() - TlsActivationContext->EventStart; - } - - double TActivationContext::GetCurrentEventTicksAsSeconds() { - return NHPTimer::GetSeconds(GetCurrentEventTicks()); - } - + } + + double TActivationContext::GetCurrentEventTicksAsSeconds() { + return NHPTimer::GetSeconds(GetCurrentEventTicks()); + } + TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const { return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID); } @@ -107,10 +107,10 @@ namespace NActors { ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); } - void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { - ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); - } - + void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const { + ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie); + } + void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const { ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie); } @@ -119,10 +119,10 @@ namespace NActors { TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } - void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { - TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); - } - + void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { + TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); + } + void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } @@ -131,10 +131,10 @@ namespace NActors { return TlsActivationContext->ExecutorThread.ActorSystem->Timestamp(); } - TMonotonic TActivationContext::Monotonic() { - return TlsActivationContext->ExecutorThread.ActorSystem->Monotonic(); - } - + TMonotonic TActivationContext::Monotonic() { + return TlsActivationContext->ExecutorThread.ActorSystem->Monotonic(); + } + TInstant TActorContext::Now() const { return ExecutorThread.ActorSystem->Timestamp(); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 9c167ab595..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -1,7 +1,7 @@ #pragma once #include "event.h" -#include "monotonic.h" +#include "monotonic.h" #include <util/system/tls.h> #include <library/cpp/actors/util/local_process_key.h> @@ -24,13 +24,13 @@ namespace NActors { public: TMailboxHeader& Mailbox; TExecutorThread& ExecutorThread; - const NHPTimer::STime EventStart; + const NHPTimer::STime EventStart; protected: - explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart) + explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart) : Mailbox(mailbox) , ExecutorThread(executorThread) - , EventStart(eventStart) + , EventStart(eventStart) { } @@ -40,22 +40,22 @@ namespace NActors { /** * Schedule one-shot event that will be send at given time point in the future. * - * @param deadline the wallclock time point in future when event must be send + * @param deadline the wallclock time point in future when event must be send * @param ev the event to send * @param cookie cookie that will be piggybacked with event */ static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - - /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + + /** * Schedule one-shot event that will be send after given delay. * * @param delta the time from now to delay event sending @@ -65,7 +65,7 @@ namespace NActors { static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); static TInstant Now(); - static TMonotonic Monotonic(); + static TMonotonic Monotonic(); NLog::TSettings* LoggerSettings() const; // register new actor in ActorSystem on new fresh mailbox. @@ -83,16 +83,16 @@ namespace NActors { static TActorId InterconnectProxy(ui32 nodeid); static TActorSystem* ActorSystem(); - - static i64 GetCurrentEventTicks(); - static double GetCurrentEventTicksAsSeconds(); + + static i64 GetCurrentEventTicks(); + static double GetCurrentEventTicksAsSeconds(); }; struct TActorContext: public TActivationContext { const TActorId SelfID; explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID) - : TActivationContext(mailbox, executorThread, eventStart) + : TActivationContext(mailbox, executorThread, eventStart) , SelfID(selfID) { } @@ -110,22 +110,22 @@ namespace NActors { /** * Schedule one-shot event that will be send at given time point in the future. * - * @param deadline the wallclock time point in future when event must be send + * @param deadline the wallclock time point in future when event must be send * @param ev the event to send * @param cookie cookie that will be piggybacked with event */ void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; - - /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; + + /** * Schedule one-shot event that will be send after given delay. * * @param delta the time from now to delay event sending @@ -135,7 +135,7 @@ namespace NActors { void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; TActorContext MakeFor(const TActorId& otherId) const { - return TActorContext(Mailbox, ExecutorThread, EventStart, otherId); + return TActorContext(Mailbox, ExecutorThread, EventStart, otherId); } // register new actor in ActorSystem on new fresh mailbox. @@ -179,22 +179,22 @@ namespace NActors { /** * Schedule one-shot event that will be send at given time point in the future. * - * @param deadline the wallclock time point in future when event must be send + * @param deadline the wallclock time point in future when event must be send * @param ev the event to send * @param cookie cookie that will be piggybacked with event */ virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; - - /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0; + + /** * Schedule one-shot event that will be send after given delay. * * @param delta the time from now to delay event sending @@ -240,11 +240,11 @@ namespace NActors { INTERCONNECT_SESSION_KILLER = 286, ACTOR_SYSTEM_SCHEDULER_ACTOR = 312, ACTOR_FUTURE_CALLBACK = 337, - INTERCONNECT_MONACTOR = 362, + INTERCONNECT_MONACTOR = 362, INTERCONNECT_LOAD_ACTOR = 376, INTERCONNECT_LOAD_RESPONDER = 377, NAMESERVICE = 450, - DNS_RESOLVER = 481, + DNS_RESOLVER = 481, INTERCONNECT_PROXY_WRAPPER = 546, }; @@ -362,7 +362,7 @@ namespace NActors { } void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; - void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; + void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; // register new actor in ActorSystem on new fresh mailbox. @@ -456,7 +456,7 @@ namespace NActors { inline TActorContext TActivationContext::ActorContextFor(TActorId id) { auto& tls = *TlsActivationContext; - return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id); + return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id); } class TDecorator : public IActor { diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index d3bcdafbd3..0ab4d2b24d 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -1,9 +1,9 @@ #include "actor_coroutine.h" #include "executor_thread.h" -#include <util/system/sanitizers.h> +#include <util/system/sanitizers.h> #include <util/system/type_name.h> - + namespace NActors { static constexpr size_t StackOverflowGap = 4096; static char GoodStack[StackOverflowGap]; @@ -92,8 +92,8 @@ namespace NActors { } // prepare actor context for in-coroutine use - TActivationContext *ac = TlsActivationContext; - TlsActivationContext = nullptr; + TActivationContext *ac = TlsActivationContext; + TlsActivationContext = nullptr; TActorContext ctx(ac->Mailbox, ac->ExecutorThread, ac->EventStart, SelfActorId); ActorContext = &ctx; diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h index c9e6173173..d972b1a0ff 100644 --- a/library/cpp/actors/core/actorid.h +++ b/library/cpp/actors/core/actorid.h @@ -12,13 +12,13 @@ namespace NActors { // next 20 bits - node id itself struct TActorId { - static constexpr ui32 MaxServiceIDLength = 12; - static constexpr ui32 MaxPoolID = 0x000007FF; - static constexpr ui32 MaxNodeId = 0x000FFFFF; - static constexpr ui32 PoolIndexShift = 20; - static constexpr ui32 PoolIndexMask = MaxPoolID << PoolIndexShift; - static constexpr ui32 ServiceMask = 0x80000000; - static constexpr ui32 NodeIdMask = MaxNodeId; + static constexpr ui32 MaxServiceIDLength = 12; + static constexpr ui32 MaxPoolID = 0x000007FF; + static constexpr ui32 MaxNodeId = 0x000FFFFF; + static constexpr ui32 PoolIndexShift = 20; + static constexpr ui32 PoolIndexMask = MaxPoolID << PoolIndexShift; + static constexpr ui32 ServiceMask = 0x80000000; + static constexpr ui32 NodeIdMask = MaxNodeId; private: union { diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 4130b0d4da..c58698a206 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -43,7 +43,7 @@ namespace NActors { , Scheduler(setup->Scheduler) , InterconnectCount((ui32)setup->Interconnect.ProxyActors.size()) , CurrentTimestamp(0) - , CurrentMonotonic(0) + , CurrentMonotonic(0) , CurrentIDCounter(RandomNumber<ui64>()) , SystemSetup(setup.Release()) , DefSelfID(NodeId, "actorsystem") @@ -69,15 +69,15 @@ namespace NActors { #endif TActorId recipient = ev->GetRecipientRewrite(); - const ui32 recpNodeId = recipient.NodeId(); + const ui32 recpNodeId = recipient.NodeId(); if (recpNodeId != NodeId && recpNodeId != 0) { // if recipient is not local one - rewrite with forward instruction Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); - Y_VERIFY(ev->Recipient == recipient, - "Event rewrite from %s to %s would be lost via interconnect", - ev->Recipient.ToString().c_str(), - recipient.ToString().c_str()); + Y_VERIFY(ev->Recipient == recipient, + "Event rewrite from %s to %s would be lost via interconnect", + ev->Recipient.ToString().c_str(), + recipient.ToString().c_str()); recipient = InterconnectProxy(recpNodeId); ev->Rewrite(TEvInterconnect::EvForward, recipient); } @@ -119,20 +119,20 @@ namespace NActors { } void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { - Schedule(deadline - Timestamp(), ev, cookie); + Schedule(deadline - Timestamp(), ev, cookie); + } + + void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { + const auto current = Monotonic(); + if (deadline < current) + deadline = current; + + TTicketLock::TGuard guard(&ScheduleLock); + ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } - void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { - const auto current = Monotonic(); - if (deadline < current) - deadline = current; - - TTicketLock::TGuard guard(&ScheduleLock); - ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { - const auto deadline = Monotonic() + delta; + const auto deadline = Monotonic() + delta; TTicketLock::TGuard guard(&ScheduleLock); ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); @@ -211,7 +211,7 @@ namespace NActors { TVector<NSchedulerQueue::TReader*> scheduleReaders; scheduleReaders.push_back(&ScheduleQueue->Reader); CpuManager->PrepareStart(scheduleReaders, this); - Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic); + Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic); Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size()); // setup interconnect proxies @@ -242,9 +242,9 @@ namespace NActors { // ok, setup complete, we could destroy setup config SystemSetup.Destroy(); - Scheduler->PrepareStart(); + Scheduler->PrepareStart(); CpuManager->Start(); - Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic)); + Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic)); Scheduler->Start(); } diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 80d33901b0..40499d7586 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -27,7 +27,7 @@ namespace NActors { char data[12]; memcpy(data, "ICProxy@", 8); memcpy(data + 8, &destNodeId, sizeof(ui32)); - return TActorId(0, TStringBuf(data, 12)); + return TActorId(0, TStringBuf(data, 12)); } inline bool IsInterconnectProxyId(const TActorId& actorId) { @@ -69,7 +69,7 @@ namespace NActors { /** * Schedule one-shot event that will be send at given time point in the future. * - * @param deadline the wallclock time point in future when event must be send + * @param deadline the wallclock time point in future when event must be send * @param ev the event to send * @param cookie cookie that will be piggybacked with event * @param workerId index of thread which will perform event dispatching @@ -77,16 +77,16 @@ namespace NActors { virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event * @param workerId index of thread which will perform event dispatching - */ + */ virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - - /** + + /** * Schedule one-shot event that will be send after given delay. * * @param delta the time from now to delay event sending @@ -136,9 +136,9 @@ namespace NActors { virtual ~ISchedulerThread() { } - virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0; + virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0; virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0; - virtual void PrepareStart() { /* empty */ } + virtual void PrepareStart() { /* empty */ } virtual void Start() = 0; virtual void PrepareStop() = 0; virtual void Stop() = 0; @@ -226,7 +226,7 @@ namespace NActors { TArrayHolder<TActorId> Interconnect; volatile ui64 CurrentTimestamp; - volatile ui64 CurrentMonotonic; + volatile ui64 CurrentMonotonic; volatile ui64 CurrentIDCounter; THolder<NSchedulerQueue::TQueueType> ScheduleQueue; @@ -264,22 +264,22 @@ namespace NActors { /** * Schedule one-shot event that will be send at given time point in the future. * - * @param deadline the wallclock time point in future when event must be send + * @param deadline the wallclock time point in future when event must be send * @param ev the event to send * @param cookie cookie that will be piggybacked with event */ void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - */ - void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; - - /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + */ + void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const; + + /** * Schedule one-shot event that will be send after given delay. * * @param delta the time from now to delay event sending @@ -340,10 +340,10 @@ namespace NActors { return TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp)); } - TMonotonic Monotonic() const { - return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic)); - } - + TMonotonic Monotonic() const { + return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic)); + } + template <typename T> T* AppData() const { return (T*)AppData0; diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 381dac03af..702cf50fad 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -4,7 +4,7 @@ #include "event_pb.h" #include <library/cpp/actors/protos/actors.pb.h> -#include <util/system/unaligned_mem.h> +#include <util/system/unaligned_mem.h> namespace NActors { struct TEvents { @@ -161,9 +161,9 @@ namespace NActors { private: static TString MakeData(ui32 sourceType, ui32 reason) { TString s = TString::Uninitialized(sizeof(ui32) + sizeof(ui32)); - char *p = s.Detach(); - WriteUnaligned<ui32>(p + 0, sourceType); - WriteUnaligned<ui32>(p + 4, reason); + char *p = s.Detach(); + WriteUnaligned<ui32>(p + 0, sourceType); + WriteUnaligned<ui32>(p + 4, reason); return s; } }; diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index 3b96625cab..23deaffd10 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -32,9 +32,9 @@ namespace NActors { IEventBase* TEvents::TEvUndelivered::Load(TEventSerializedData* bufs) { TString str = bufs->GetString(); Y_VERIFY(str.size() == (sizeof(ui32) + sizeof(ui32))); - const char* p = str.data(); - const ui64 sourceType = ReadUnaligned<ui32>(p + 0); - const ui64 reason = ReadUnaligned<ui32>(p + 4); + const char* p = str.data(); + const ui64 sourceType = ReadUnaligned<ui32>(p + 0); + const ui64 reason = ReadUnaligned<ui32>(p + 4); return new TEvUndelivered(sourceType, reason); } diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 87315a6a6a..4dce16939a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -87,17 +87,17 @@ namespace NActors { if (x < 0) { #if defined ACTORSLIB_COLLECT_EXEC_STATS if (AtomicGetAndIncrement(ThreadUtilization) == 0) { - // Initially counter contains -t0, the pool start timestamp - // When the first thread goes to sleep we add t1, so the counter - // becomes t1-t0 >= 0, or the duration of max utilization so far. - // If the counter was negative and becomes positive, that means - // counter just turned into a duration and we should store that - // duration. Otherwise another thread raced with us and - // subtracted some other timestamp t2. + // Initially counter contains -t0, the pool start timestamp + // When the first thread goes to sleep we add t1, so the counter + // becomes t1-t0 >= 0, or the duration of max utilization so far. + // If the counter was negative and becomes positive, that means + // counter just turned into a duration and we should store that + // duration. Otherwise another thread raced with us and + // subtracted some other timestamp t2. const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); - if (x < 0 && x + t > 0) - AtomicStore(&MaxUtilizationAccumulator, x + t); + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); + if (x < 0 && x + t > 0) + AtomicStore(&MaxUtilizationAccumulator, x + t); } #endif @@ -126,7 +126,7 @@ namespace NActors { if (!doSpin) { break; } - if (RelaxedLoad(&StopFlag)) { + if (RelaxedLoad(&StopFlag)) { break; } } @@ -159,20 +159,20 @@ namespace NActors { #if defined ACTORSLIB_COLLECT_EXEC_STATS if (AtomicDecrement(ThreadUtilization) == 0) { - // When we started sleeping counter contained t1-t0, or the - // last duration of max utilization. Now we subtract t2 >= t1, - // which turns counter negative again, and the next sleep cycle - // at timestamp t3 would be adding some new duration t3-t2. - // If the counter was positive and becomes negative that means - // there are no current races with other threads and we should - // store the last positive duration we observed. Multiple - // threads may be adding and subtracting values in potentially - // arbitrary order, which would cause counter to oscillate - // around zero. When it crosses zero is a good indication of a - // correct value. + // When we started sleeping counter contained t1-t0, or the + // last duration of max utilization. Now we subtract t2 >= t1, + // which turns counter negative again, and the next sleep cycle + // at timestamp t3 would be adding some new duration t3-t2. + // If the counter was positive and becomes negative that means + // there are no current races with other threads and we should + // store the last positive duration we observed. Multiple + // threads may be adding and subtracting values in potentially + // arbitrary order, which would cause counter to oscillate + // around zero. When it crosses zero is a good indication of a + // correct value. const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); - if (x > 0 && x - t < 0) + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); + if (x > 0 && x - t < 0) AtomicStore(&MaxUtilizationAccumulator, x); } #endif @@ -305,18 +305,18 @@ namespace NActors { void TBasicExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_VERIFY_DEBUG(workerId < PoolThreads); - - const auto current = ActorSystem->Monotonic(); - if (deadline < current) - deadline = current; - + + const auto current = ActorSystem->Monotonic(); + if (deadline < current) + deadline = current; + ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - + } + void TBasicExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_VERIFY_DEBUG(workerId < PoolThreads); - const auto deadline = ActorSystem->Monotonic() + delta; + const auto deadline = ActorSystem->Monotonic() + delta; ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie); } diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index b3d6d8b329..76dff693af 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -47,8 +47,8 @@ public: if (GetCounter() == 0) { break; } - - Sleep(TDuration::MilliSeconds(1)); + + Sleep(TDuration::MilliSeconds(1)); } } @@ -69,8 +69,8 @@ private: { Y_UNUSED(ev); Action(); - TAtomicBase count = AtomicDecrement(Counter); - Y_VERIFY(count != Max<TAtomicBase>()); + TAtomicBase count = AtomicDecrement(Counter); + Y_VERIFY(count != Max<TAtomicBase>()); if (count) { Send(Receiver, new TEvMsg()); } @@ -206,19 +206,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { actorSystem.Send(changerActorId, new TEvMsg()); while (true) { - size_t maxCounter = 0; + size_t maxCounter = 0; for (size_t i = 0; i < size; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); + maxCounter = Max(maxCounter, actors[i]->GetCounter()); + } + + if (maxCounter == 0) { + break; } - if (maxCounter == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } changerActor->Stop(); @@ -242,9 +242,9 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { while (actor->GetCounter()) { auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); - - Sleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); + + Sleep(TDuration::MilliSeconds(1)); } } @@ -275,19 +275,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { while (true) { - size_t maxCounter = 0; + size_t maxCounter = 0; for (size_t i = 0; i < size; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); + maxCounter = Max(maxCounter, actors[i]->GetCounter()); + } + + if (maxCounter == 0) { + break; } - if (maxCounter == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } } @@ -319,19 +319,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { while (true) { - size_t maxCounter = 0; + size_t maxCounter = 0; for (size_t i = 0; i < actorsCount; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); + maxCounter = Max(maxCounter, actors[i]->GetCounter()); + } + + if (maxCounter == 0) { + break; } - if (maxCounter == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } } @@ -362,19 +362,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { } while (true) { - size_t maxCounter = 0; + size_t maxCounter = 0; for (size_t i = 0; i < actorsCount; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); + maxCounter = Max(maxCounter, actors[i]->GetCounter()); + } + + if (maxCounter == 0) { + break; } - if (maxCounter == 0) { - break; - } - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } } diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp index f4f13c9c20..fb557ae6b0 100644 --- a/library/cpp/actors/core/executor_pool_io.cpp +++ b/library/cpp/actors/core/executor_pool_io.cpp @@ -30,33 +30,33 @@ namespace NActors { ui32 workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); - NHPTimer::STime elapsed = 0; - NHPTimer::STime parked = 0; + NHPTimer::STime elapsed = 0; + NHPTimer::STime parked = 0; NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpnow; - + NHPTimer::STime hpnow; + const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { TThreadCtx& threadCtx = Threads[workerId]; ThreadQueue.Push(workerId + 1, revolvingCounter); hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; + elapsed += hpnow - hpstart; if (threadCtx.Pad.Park()) return 0; hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; + parked += hpstart - hpnow; } while (!RelaxedLoad(&StopFlag)) { - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (const ui32 activation = Activations.Pop(++revolvingCounter)) { hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; + elapsed += hpnow - hpstart; wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed); - if (parked > 0) { + if (parked > 0) { wctx.AddParkedCycles(parked); - } + } return activation; - } + } SpinLockPause(); } @@ -69,18 +69,18 @@ namespace NActors { void TIOExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_UNUSED(workerId); - - const auto current = ActorSystem->Monotonic(); - if (deadline < current) - deadline = current; - - TTicketLock::TGuard guard(&ScheduleLock); - ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); - } - + + const auto current = ActorSystem->Monotonic(); + if (deadline < current) + deadline = current; + + TTicketLock::TGuard guard(&ScheduleLock); + ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); + } + void TIOExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_UNUSED(workerId); - const auto deadline = ActorSystem->Monotonic() + delta; + const auto deadline = ActorSystem->Monotonic() + delta; TTicketLock::TGuard guard(&ScheduleLock); ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h index 7de6fd0528..e576d642a1 100644 --- a/library/cpp/actors/core/executor_pool_io.h +++ b/library/cpp/actors/core/executor_pool_io.h @@ -17,7 +17,7 @@ namespace NActors { }; TArrayHolder<TThreadCtx> Threads; - TUnorderedCache<ui32, 512, 4> ThreadQueue; + TUnorderedCache<ui32, 512, 4> ThreadQueue; THolder<NSchedulerQueue::TQueueType> ScheduleQueue; TTicketLock ScheduleLock; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index 4ae9ec4c73..dac6245635 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -1235,7 +1235,7 @@ namespace NActors { inline void TUnitedWorkers::TryWake(TPoolId pool) { // Avoid using multiple atomic seq_cst loads in cycle, use barrier once AtomicBarrier(); - + // Scan every allowed cpu in pool's wakeup order and try to wake the first idle cpu if (RelaxedLoad(&Pools[pool].Waiters) > 0) { for (TCpu* cpu : Pools[pool].WakeOrderCpus) { @@ -1247,11 +1247,11 @@ namespace NActors { // Cpu has not been woken up } - + inline void TUnitedWorkers::BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) { Pools[pool].BeginExecution(activation, revolvingCounter); - } - + } + inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) { return Pools[pool].NextExecution(activation, revolvingCounter); } diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index ddc4c348ee..d4df17f1b8 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -58,8 +58,8 @@ public: if (GetCounter() == 0) { break; } - - Sleep(TDuration::MilliSeconds(1)); + + Sleep(TDuration::MilliSeconds(1)); } } @@ -78,8 +78,8 @@ private: void Handle(TEvMsg::TPtr &ev) { Y_UNUSED(ev); Action(); - TAtomicBase count = AtomicDecrement(Counter); - Y_VERIFY(count != Max<TAtomicBase>()); + TAtomicBase count = AtomicDecrement(Counter); + Y_VERIFY(count != Max<TAtomicBase>()); if (count) { Send(Receiver, new TEvMsg()); } @@ -149,9 +149,9 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { while (actor->GetCounter()) { auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); - - Sleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); + + Sleep(TDuration::MilliSeconds(1)); } TVector<TExecutorThreadStats> stats; @@ -212,11 +212,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { left += actor->GetCounter(); } if (left == 0) { - break; - } + break; + } auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } for (size_t pool = 0; pool < pools; pool++) { @@ -311,11 +311,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { left += actor->GetCounter(); } if (left == 0) { - break; - } + break; + } auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left); - Sleep(TDuration::MilliSeconds(1)); + Sleep(TDuration::MilliSeconds(1)); } for (size_t pool = 0; pool < pools; pool++) { diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 7e961584e6..446b651efd 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -76,11 +76,11 @@ namespace NActors { Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); } - void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { - ++CurrentActorScheduledEventsCounter; + void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); - } - + } + void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId); @@ -145,7 +145,7 @@ namespace NActors { NHPTimer::STime hpnow; recipient = ev->GetRecipientRewrite(); if (actor = mailbox->FindActor(recipient.LocalId())) { - TActorContext ctx(*mailbox, *this, hpprev, recipient); + TActorContext ctx(*mailbox, *this, hpprev, recipient); TlsActivationContext = &ctx; #ifdef USE_ACTOR_CALLSTACK diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 72b8f28c1d..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -47,7 +47,7 @@ namespace NActors { const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); bool Send(TAutoPtr<IEventHandle> ev) { diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 391844dcff..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -223,9 +223,9 @@ namespace NActors { } } - void TLoggerActor::Throttle(const NLog::TSettings& settings) { + void TLoggerActor::Throttle(const NLog::TSettings& settings) { if (AtomicGet(IsOverflow)) - Sleep(settings.ThrottleDelay); + Sleep(settings.ThrottleDelay); } void TLoggerActor::LogIgnoredCount(TInstant now) { diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index f063642ad2..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -232,7 +232,7 @@ namespace NActors { // Directly call logger instead of sending a message void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...); - static void Throttle(const NLog::TSettings& settings); + static void Throttle(const NLog::TSettings& settings); private: TIntrusivePtr<NLog::TSettings> Settings; @@ -322,7 +322,7 @@ namespace NActors { inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str) { const NLog::TSettings *mSettings = ctx.LoggerSettings(); - TLoggerActor::Throttle(*mSettings); + TLoggerActor::Throttle(*mSettings); ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str)))); } diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index d77688021d..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -12,7 +12,7 @@ namespace NActors { , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) , AllowDrop(true) - , ThrottleDelay(TDuration::MilliSeconds(100)) + , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) , MaxVal(0) , Mask(0) @@ -34,7 +34,7 @@ namespace NActors { , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) , AllowDrop(true) - , ThrottleDelay(TDuration::MilliSeconds(100)) + , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) , MaxVal(0) , Mask(0) @@ -205,10 +205,10 @@ namespace NActors { AllowDrop = val; } - void TSettings::SetThrottleDelay(TDuration value) { - ThrottleDelay = value; - } - + void TSettings::SetThrottleDelay(TDuration value) { + ThrottleDelay = value; + } + void TSettings::SetUseLocalTimestamps(bool value) { UseLocalTimestamps = value; } diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 5f4898bda1..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -73,7 +73,7 @@ namespace NActors { EComponent LoggerComponent; ui64 TimeThresholdMs; bool AllowDrop; - TDuration ThrottleDelay; + TDuration ThrottleDelay; TArrayHolder<TAtomic> ComponentInfo; TVector<TString> ComponentNames; EComponent MinVal; @@ -162,7 +162,7 @@ namespace NActors { static bool IsValidPriority(EPriority priority); bool IsValidComponent(EComponent component); void SetAllowDrop(bool val); - void SetThrottleDelay(TDuration value); + void SetThrottleDelay(TDuration value); void SetUseLocalTimestamps(bool value); private: diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index b63577c7d6..d84b4f9e46 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -529,7 +529,7 @@ namespace NActors { Y_FAIL(); } - AtomicStore(Lines + lineIndex, header); + AtomicStore(Lines + lineIndex, header); ui32 ret = lineIndexMask | 1; diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 8f4f901bd9..0bd9c4d314 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -277,7 +277,7 @@ namespace NActors { TAtomic LastAllocatedLine; TAtomic AllocatedMailboxCount; - typedef TUnorderedCache<ui32, 512, 4> TMailboxCache; + typedef TUnorderedCache<ui32, 512, 4> TMailboxCache; TMailboxCache MailboxCacheSimple; TAtomic CachedSimpleMailboxes; TMailboxCache MailboxCacheRevolving; diff --git a/library/cpp/actors/core/monotonic.cpp b/library/cpp/actors/core/monotonic.cpp index eefd8913cc..3465149dbe 100644 --- a/library/cpp/actors/core/monotonic.cpp +++ b/library/cpp/actors/core/monotonic.cpp @@ -1,23 +1,23 @@ -#include "monotonic.h" - -#include <chrono> - -namespace NActors { - - namespace { - // Unfortunately time_since_epoch() is sometimes negative on wine - // Remember initial time point at program start and use offsets from that - std::chrono::steady_clock::time_point MonotonicOffset = std::chrono::steady_clock::now(); - } - - ui64 GetMonotonicMicroSeconds() { - auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - MonotonicOffset).count(); - // Steady clock is supposed to never jump backwards, but it's better to be safe in case of buggy implementations - if (Y_UNLIKELY(microseconds < 0)) { - microseconds = 0; - } - // Add one so we never return zero - return microseconds + 1; - } - -} // namespace NActors +#include "monotonic.h" + +#include <chrono> + +namespace NActors { + + namespace { + // Unfortunately time_since_epoch() is sometimes negative on wine + // Remember initial time point at program start and use offsets from that + std::chrono::steady_clock::time_point MonotonicOffset = std::chrono::steady_clock::now(); + } + + ui64 GetMonotonicMicroSeconds() { + auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - MonotonicOffset).count(); + // Steady clock is supposed to never jump backwards, but it's better to be safe in case of buggy implementations + if (Y_UNLIKELY(microseconds < 0)) { + microseconds = 0; + } + // Add one so we never return zero + return microseconds + 1; + } + +} // namespace NActors diff --git a/library/cpp/actors/core/monotonic.h b/library/cpp/actors/core/monotonic.h index cc0136b558..6fceb91dbe 100644 --- a/library/cpp/actors/core/monotonic.h +++ b/library/cpp/actors/core/monotonic.h @@ -1,111 +1,111 @@ -#pragma once - -#include <util/datetime/base.h> - -namespace NActors { - - /** - * Returns current monotonic time in microseconds - */ - ui64 GetMonotonicMicroSeconds(); - - /** - * Similar to TInstant, but measuring monotonic time - */ - class TMonotonic : public TTimeBase<TMonotonic> { - using TBase = TTimeBase<TMonotonic>; - - private: - constexpr explicit TMonotonic(TValue value) noexcept - : TBase(value) - { } - - public: - constexpr TMonotonic() noexcept { - } - - static constexpr TMonotonic FromValue(TValue value) noexcept { - return TMonotonic(value); - } - - static inline TMonotonic Now() { - return TMonotonic::MicroSeconds(GetMonotonicMicroSeconds()); - } - - using TBase::Days; - using TBase::Hours; - using TBase::MicroSeconds; - using TBase::MilliSeconds; - using TBase::Minutes; - using TBase::Seconds; - - static constexpr TMonotonic Max() noexcept { - return TMonotonic(::Max<ui64>()); - } - - static constexpr TMonotonic Zero() noexcept { - return TMonotonic(); - } - - static constexpr TMonotonic MicroSeconds(ui64 us) noexcept { - return TMonotonic(TInstant::MicroSeconds(us).GetValue()); - } - - static constexpr TMonotonic MilliSeconds(ui64 ms) noexcept { - return TMonotonic(TInstant::MilliSeconds(ms).GetValue()); - } - - static constexpr TMonotonic Seconds(ui64 s) noexcept { - return TMonotonic(TInstant::Seconds(s).GetValue()); - } - - static constexpr TMonotonic Minutes(ui64 m) noexcept { - return TMonotonic(TInstant::Minutes(m).GetValue()); - } - - static constexpr TMonotonic Hours(ui64 h) noexcept { - return TMonotonic(TInstant::Hours(h).GetValue()); - } - - static constexpr TMonotonic Days(ui64 d) noexcept { - return TMonotonic(TInstant::Days(d).GetValue()); - } - - template<class T> - inline TMonotonic& operator+=(const T& t) noexcept { - return (*this = (*this + t)); - } - - template<class T> - inline TMonotonic& operator-=(const T& t) noexcept { - return (*this = (*this - t)); - } - }; -} // namespace NActors - -Y_DECLARE_PODTYPE(NActors::TMonotonic); - -template<> -struct THash<NActors::TMonotonic> { - size_t operator()(const NActors::TMonotonic& key) const { - return THash<NActors::TMonotonic::TValue>()(key.GetValue()); - } -}; - -namespace NActors { - - constexpr TDuration operator-(const TMonotonic& l, const TMonotonic& r) { - return TInstant::FromValue(l.GetValue()) - TInstant::FromValue(r.GetValue()); - } - - constexpr TMonotonic operator+(const TMonotonic& l, const TDuration& r) { - TInstant result = TInstant::FromValue(l.GetValue()) + r; - return TMonotonic::FromValue(result.GetValue()); - } - - constexpr TMonotonic operator-(const TMonotonic& l, const TDuration& r) { - TInstant result = TInstant::FromValue(l.GetValue()) - r; - return TMonotonic::FromValue(result.GetValue()); - } - -} // namespace NActors +#pragma once + +#include <util/datetime/base.h> + +namespace NActors { + + /** + * Returns current monotonic time in microseconds + */ + ui64 GetMonotonicMicroSeconds(); + + /** + * Similar to TInstant, but measuring monotonic time + */ + class TMonotonic : public TTimeBase<TMonotonic> { + using TBase = TTimeBase<TMonotonic>; + + private: + constexpr explicit TMonotonic(TValue value) noexcept + : TBase(value) + { } + + public: + constexpr TMonotonic() noexcept { + } + + static constexpr TMonotonic FromValue(TValue value) noexcept { + return TMonotonic(value); + } + + static inline TMonotonic Now() { + return TMonotonic::MicroSeconds(GetMonotonicMicroSeconds()); + } + + using TBase::Days; + using TBase::Hours; + using TBase::MicroSeconds; + using TBase::MilliSeconds; + using TBase::Minutes; + using TBase::Seconds; + + static constexpr TMonotonic Max() noexcept { + return TMonotonic(::Max<ui64>()); + } + + static constexpr TMonotonic Zero() noexcept { + return TMonotonic(); + } + + static constexpr TMonotonic MicroSeconds(ui64 us) noexcept { + return TMonotonic(TInstant::MicroSeconds(us).GetValue()); + } + + static constexpr TMonotonic MilliSeconds(ui64 ms) noexcept { + return TMonotonic(TInstant::MilliSeconds(ms).GetValue()); + } + + static constexpr TMonotonic Seconds(ui64 s) noexcept { + return TMonotonic(TInstant::Seconds(s).GetValue()); + } + + static constexpr TMonotonic Minutes(ui64 m) noexcept { + return TMonotonic(TInstant::Minutes(m).GetValue()); + } + + static constexpr TMonotonic Hours(ui64 h) noexcept { + return TMonotonic(TInstant::Hours(h).GetValue()); + } + + static constexpr TMonotonic Days(ui64 d) noexcept { + return TMonotonic(TInstant::Days(d).GetValue()); + } + + template<class T> + inline TMonotonic& operator+=(const T& t) noexcept { + return (*this = (*this + t)); + } + + template<class T> + inline TMonotonic& operator-=(const T& t) noexcept { + return (*this = (*this - t)); + } + }; +} // namespace NActors + +Y_DECLARE_PODTYPE(NActors::TMonotonic); + +template<> +struct THash<NActors::TMonotonic> { + size_t operator()(const NActors::TMonotonic& key) const { + return THash<NActors::TMonotonic::TValue>()(key.GetValue()); + } +}; + +namespace NActors { + + constexpr TDuration operator-(const TMonotonic& l, const TMonotonic& r) { + return TInstant::FromValue(l.GetValue()) - TInstant::FromValue(r.GetValue()); + } + + constexpr TMonotonic operator+(const TMonotonic& l, const TDuration& r) { + TInstant result = TInstant::FromValue(l.GetValue()) + r; + return TMonotonic::FromValue(result.GetValue()); + } + + constexpr TMonotonic operator-(const TMonotonic& l, const TDuration& r) { + TInstant result = TInstant::FromValue(l.GetValue()) - r; + return TMonotonic::FromValue(result.GetValue()); + } + +} // namespace NActors diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp index c189653e99..febc5e40dd 100644 --- a/library/cpp/actors/core/scheduler_actor.cpp +++ b/library/cpp/actors/core/scheduler_actor.cpp @@ -43,7 +43,7 @@ namespace NActors { TPollerToken::TPtr PollerToken; ui64 RealTime; - ui64 MonotonicTime; + ui64 MonotonicTime; ui64 ActiveTick; typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues @@ -56,7 +56,7 @@ namespace NActors { static const ui64 IntrasecondThreshold = 1048576; // ~second TAutoPtr<TMomentMap> ActiveSec; volatile ui64* CurrentTimestamp = nullptr; - volatile ui64* CurrentMonotonic = nullptr; + volatile ui64* CurrentMonotonic = nullptr; TDeque<TAutoPtr<IEventHandle>> EventsToBeSent; public: @@ -84,9 +84,9 @@ namespace NActors { Y_ASSERT(evInitialize.CurrentTimestamp != nullptr); CurrentTimestamp = evInitialize.CurrentTimestamp; - Y_ASSERT(evInitialize.CurrentMonotonic != nullptr); - CurrentMonotonic = evInitialize.CurrentMonotonic; - + Y_ASSERT(evInitialize.CurrentMonotonic != nullptr); + CurrentMonotonic = evInitialize.CurrentMonotonic; + struct itimerspec new_time; memset(&new_time, 0, sizeof(new_time)); new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000; @@ -96,10 +96,10 @@ namespace NActors { const bool success = ctx.Send(PollerActor, new TEvPollerRegister(TimerDescriptor, SelfId(), {})); Y_VERIFY(success); - RealTime = RelaxedLoad(CurrentTimestamp); - MonotonicTime = RelaxedLoad(CurrentMonotonic); + RealTime = RelaxedLoad(CurrentTimestamp); + MonotonicTime = RelaxedLoad(CurrentMonotonic); - ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold); + ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold); } void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx) { @@ -108,10 +108,10 @@ namespace NActors { } void UpdateTime() { - RealTime = TInstant::Now().MicroSeconds(); - MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds()); - AtomicStore(CurrentTimestamp, RealTime); - AtomicStore(CurrentMonotonic, MonotonicTime); + RealTime = TInstant::Now().MicroSeconds(); + MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds()); + AtomicStore(CurrentTimestamp, RealTime); + AtomicStore(CurrentMonotonic, MonotonicTime); } void TryUpdateTime(NHPTimer::STime* lastTimeUpdate) { diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h index 4209db0ab6..c2c561b43d 100644 --- a/library/cpp/actors/core/scheduler_actor.h +++ b/library/cpp/actors/core/scheduler_actor.h @@ -9,12 +9,12 @@ namespace NActors { struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> { TVector<NSchedulerQueue::TReader*> ScheduleReaders; volatile ui64* CurrentTimestamp; - volatile ui64* CurrentMonotonic; + volatile ui64* CurrentMonotonic; - TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) + TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) : ScheduleReaders(scheduleReaders) , CurrentTimestamp(currentTimestamp) - , CurrentMonotonic(currentMonotonic) + , CurrentMonotonic(currentMonotonic) { } }; diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp index b0c80eb6d2..fba200e16b 100644 --- a/library/cpp/actors/core/scheduler_basic.cpp +++ b/library/cpp/actors/core/scheduler_basic.cpp @@ -9,35 +9,35 @@ #endif namespace NActors { - - struct TBasicSchedulerThread::TMonCounters { - NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; - NMonitoring::TDynamicCounters::TCounterPtr QueueSize; - NMonitoring::TDynamicCounters::TCounterPtr EventsSent; - NMonitoring::TDynamicCounters::TCounterPtr EventsDropped; - NMonitoring::TDynamicCounters::TCounterPtr EventsAdded; - NMonitoring::TDynamicCounters::TCounterPtr Iterations; - NMonitoring::TDynamicCounters::TCounterPtr Sleeps; - NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; - - TMonCounters(const NMonitoring::TDynamicCounterPtr& counters) - : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false)) - , QueueSize(counters->GetCounter("Scheduler/QueueSize", false)) - , EventsSent(counters->GetCounter("Scheduler/EventsSent", true)) - , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true)) - , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true)) - , Iterations(counters->GetCounter("Scheduler/Iterations", true)) - , Sleeps(counters->GetCounter("Scheduler/Sleeps", true)) - , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true)) - { } - }; - + + struct TBasicSchedulerThread::TMonCounters { + NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; + NMonitoring::TDynamicCounters::TCounterPtr QueueSize; + NMonitoring::TDynamicCounters::TCounterPtr EventsSent; + NMonitoring::TDynamicCounters::TCounterPtr EventsDropped; + NMonitoring::TDynamicCounters::TCounterPtr EventsAdded; + NMonitoring::TDynamicCounters::TCounterPtr Iterations; + NMonitoring::TDynamicCounters::TCounterPtr Sleeps; + NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; + + TMonCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false)) + , QueueSize(counters->GetCounter("Scheduler/QueueSize", false)) + , EventsSent(counters->GetCounter("Scheduler/EventsSent", true)) + , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true)) + , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true)) + , Iterations(counters->GetCounter("Scheduler/Iterations", true)) + , Sleeps(counters->GetCounter("Scheduler/Sleeps", true)) + , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true)) + { } + }; + TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config) : Config(config) - , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) + , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) , ActorSystem(nullptr) , CurrentTimestamp(nullptr) - , CurrentMonotonic(nullptr) + , CurrentMonotonic(nullptr) , TotalReaders(0) , StopFlag(false) , ScheduleMap(3600) @@ -55,45 +55,45 @@ namespace NActors { #endif ::SetCurrentThreadName("Scheduler"); - ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); - ui64 throttledMonotonic = currentMonotonic; + ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); + ui64 throttledMonotonic = currentMonotonic; - ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold); + ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold); TAutoPtr<TMomentMap> activeSec; NHPTimer::STime hpprev = GetCycleCountFast(); - ui64 nextTimestamp = TInstant::Now().MicroSeconds(); - ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - + ui64 nextTimestamp = TInstant::Now().MicroSeconds(); + ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + while (!AtomicLoad(&StopFlag)) { { - const ui64 delta = nextMonotonic - throttledMonotonic; - const ui64 elapsedDelta = nextMonotonic - currentMonotonic; - const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); - - throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; - - if (MonCounters) { - *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; - } + const ui64 delta = nextMonotonic - throttledMonotonic; + const ui64 elapsedDelta = nextMonotonic - currentMonotonic; + const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); + + throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; + + if (MonCounters) { + *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; + } + } + AtomicStore(CurrentTimestamp, nextTimestamp); + AtomicStore(CurrentMonotonic, nextMonotonic); + currentMonotonic = nextMonotonic; + + if (MonCounters) { + ++*MonCounters->Iterations; } - AtomicStore(CurrentTimestamp, nextTimestamp); - AtomicStore(CurrentMonotonic, nextMonotonic); - currentMonotonic = nextMonotonic; - - if (MonCounters) { - ++*MonCounters->Iterations; - } - + bool somethingDone = false; // first step - send everything triggered on schedule - ui64 eventsSent = 0; - ui64 eventsDropped = 0; + ui64 eventsSent = 0; + ui64 eventsDropped = 0; for (;;) { while (!!activeSec && !activeSec->empty()) { TMomentMap::iterator it = activeSec->begin(); - if (it->first <= throttledMonotonic) { + if (it->first <= throttledMonotonic) { if (NSchedulerQueue::TQueueType* q = it->second.Get()) { while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) { somethingDone = true; @@ -102,16 +102,16 @@ namespace NActors { ISchedulerCookie* cookie = x->Cookie; // TODO: lazy send with backoff queue to not hang over contended mailboxes if (cookie) { - if (cookie->Detach()) { + if (cookie->Detach()) { ActorSystem->Send(ev); - ++eventsSent; - } else { + ++eventsSent; + } else { delete ev; - ++eventsDropped; - } + ++eventsDropped; + } } else { ActorSystem->Send(ev); - ++eventsSent; + ++eventsSent; } } } @@ -120,7 +120,7 @@ namespace NActors { break; } - if (activeTick <= throttledMonotonic) { + if (activeTick <= throttledMonotonic) { Y_VERIFY_DEBUG(!activeSec || activeSec->empty()); activeSec.Destroy(); activeTick += IntrasecondThreshold; @@ -138,7 +138,7 @@ namespace NActors { // second step - collect everything from queues - ui64 eventsAdded = 0; + ui64 eventsAdded = 0; for (ui32 i = 0; i != TotalReaders; ++i) { while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) { somethingDone = true; @@ -165,57 +165,57 @@ namespace NActors { queue.Reset(new NSchedulerQueue::TQueueType()); queue->Writer.Push(instant, ev, cookie); } - - ++eventsAdded; + + ++eventsAdded; } } NHPTimer::STime hpnow = GetCycleCountFast(); - - if (MonCounters) { - *MonCounters->QueueSize -= eventsSent + eventsDropped; - *MonCounters->QueueSize += eventsAdded; - *MonCounters->EventsSent += eventsSent; - *MonCounters->EventsDropped += eventsDropped; - *MonCounters->EventsAdded += eventsAdded; - *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000; - } - - hpprev = hpnow; - nextTimestamp = TInstant::Now().MicroSeconds(); - nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - + + if (MonCounters) { + *MonCounters->QueueSize -= eventsSent + eventsDropped; + *MonCounters->QueueSize += eventsAdded; + *MonCounters->EventsSent += eventsSent; + *MonCounters->EventsDropped += eventsDropped; + *MonCounters->EventsAdded += eventsAdded; + *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000; + } + + hpprev = hpnow; + nextTimestamp = TInstant::Now().MicroSeconds(); + nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + // ok complete, if nothing left - sleep if (!somethingDone) { - const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); - if (nextMonotonic >= nextInstant) // already in next time-slice + const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); + if (nextMonotonic >= nextInstant) // already in next time-slice continue; - const ui64 delta = nextInstant - nextMonotonic; + const ui64 delta = nextInstant - nextMonotonic; if (delta < Config.SpinThreshold) // not so much time left, just spin continue; - if (MonCounters) { - ++*MonCounters->Sleeps; - } - + if (MonCounters) { + ++*MonCounters->Sleeps; + } + NanoSleep(delta * 1000); // ok, looks like we should sleep a bit. - - // Don't count sleep in elapsed microseconds + + // Don't count sleep in elapsed microseconds hpprev = GetCycleCountFast(); - nextTimestamp = TInstant::Now().MicroSeconds(); - nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + nextTimestamp = TInstant::Now().MicroSeconds(); + nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); } } // ok, die! } - void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { + void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { ActorSystem = actorSystem; CurrentTimestamp = currentTimestamp; - CurrentMonotonic = currentMonotonic; - *CurrentTimestamp = TInstant::Now().MicroSeconds(); - *CurrentMonotonic = GetMonotonicMicroSeconds(); + CurrentMonotonic = currentMonotonic; + *CurrentTimestamp = TInstant::Now().MicroSeconds(); + *CurrentMonotonic = GetMonotonicMicroSeconds(); } void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) { @@ -225,16 +225,16 @@ namespace NActors { Copy(readers, readers + scheduleReadersCount, Readers.Get()); } - void TBasicSchedulerThread::PrepareStart() { - // Called after actor system is initialized, but before executor threads - // are started, giving us a chance to update current timestamp with a - // more recent value, taking initialization time into account. This is - // safe to do, since scheduler thread is not started yet, so no other - // threads are updating time concurrently. - AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds()); - AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); - } - + void TBasicSchedulerThread::PrepareStart() { + // Called after actor system is initialized, but before executor threads + // are started, giving us a chance to update current timestamp with a + // more recent value, taking initialization time into account. This is + // safe to do, since scheduler thread is not started yet, so no other + // threads are updating time concurrently. + AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds()); + AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); + } + void TBasicSchedulerThread::Start() { MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this))); } diff --git a/library/cpp/actors/core/scheduler_basic.h b/library/cpp/actors/core/scheduler_basic.h index 89ef8323f5..2ccde39235 100644 --- a/library/cpp/actors/core/scheduler_basic.h +++ b/library/cpp/actors/core/scheduler_basic.h @@ -1,7 +1,7 @@ #pragma once #include "actorsystem.h" -#include "monotonic.h" +#include "monotonic.h" #include "scheduler_queue.h" #include <library/cpp/actors/util/queue_chunk.h> #include <library/cpp/threading/future/legacy_future.h> @@ -9,17 +9,17 @@ #include <util/generic/map.h> namespace NActors { - + class TBasicSchedulerThread: public ISchedulerThread { // TODO: replace with NUMA-local threads and per-thread schedules const TSchedulerConfig Config; - struct TMonCounters; - const THolder<TMonCounters> MonCounters; - + struct TMonCounters; + const THolder<TMonCounters> MonCounters; + TActorSystem* ActorSystem; volatile ui64* CurrentTimestamp; - volatile ui64* CurrentMonotonic; + volatile ui64* CurrentMonotonic; ui32 TotalReaders; TArrayHolder<NSchedulerQueue::TReader*> Readers; @@ -44,7 +44,7 @@ namespace NActors { void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override; void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override; - void PrepareStart() override; + void PrepareStart() override; void Start() override; void PrepareStop() override; void Stop() override; @@ -55,10 +55,10 @@ namespace NActors { virtual ~TMockSchedulerThread() override { } - void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override { + void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override { Y_UNUSED(actorSystem); - *currentTimestamp = TInstant::Now().MicroSeconds(); - *currentMonotonic = GetMonotonicMicroSeconds(); + *currentTimestamp = TInstant::Now().MicroSeconds(); + *currentMonotonic = GetMonotonicMicroSeconds(); } void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override { diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h index 51b597be1f..3b8fac28f0 100644 --- a/library/cpp/actors/core/scheduler_queue.h +++ b/library/cpp/actors/core/scheduler_queue.h @@ -76,10 +76,10 @@ namespace NActors { } void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) { - if (Y_UNLIKELY(instantMicrosends == 0)) { - // Protect against Pop() getting stuck forever - instantMicrosends = 1; - } + if (Y_UNLIKELY(instantMicrosends == 0)) { + // Protect against Pop() getting stuck forever + instantMicrosends = 1; + } if (WritePosition != TChunk::EntriesCount) { volatile TEntry& entry = WriteTo->Entries[WritePosition]; entry.Cookie = cookie; diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 25fc0ce902..880a9d00db 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -81,7 +81,7 @@ SRCS( memory_tracker.h mon.h mon_stats.h - monotonic.cpp + monotonic.cpp monotonic.h worker_context.cpp worker_context.h |