aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-10 16:47:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:43 +0300
commit330c83f8c116bd45316397b179275e9d87007e7d (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/core
parent22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff)
downloadydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r--library/cpp/actors/core/actor.cpp46
-rw-r--r--library/cpp/actors/core/actor.h88
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp8
-rw-r--r--library/cpp/actors/core/actorid.h14
-rw-r--r--library/cpp/actors/core/actorsystem.cpp40
-rw-r--r--library/cpp/actors/core/actorsystem.h54
-rw-r--r--library/cpp/actors/core/events.h8
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp64
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp78
-rw-r--r--library/cpp/actors/core/executor_pool_io.cpp42
-rw-r--r--library/cpp/actors/core/executor_pool_io.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp8
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp26
-rw-r--r--library/cpp/actors/core/executor_thread.cpp10
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp4
-rw-r--r--library/cpp/actors/core/log.h4
-rw-r--r--library/cpp/actors/core/log_settings.cpp12
-rw-r--r--library/cpp/actors/core/log_settings.h4
-rw-r--r--library/cpp/actors/core/mailbox.cpp2
-rw-r--r--library/cpp/actors/core/mailbox.h2
-rw-r--r--library/cpp/actors/core/monotonic.cpp46
-rw-r--r--library/cpp/actors/core/monotonic.h222
-rw-r--r--library/cpp/actors/core/scheduler_actor.cpp24
-rw-r--r--library/cpp/actors/core/scheduler_actor.h6
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp200
-rw-r--r--library/cpp/actors/core/scheduler_basic.h20
-rw-r--r--library/cpp/actors/core/scheduler_queue.h8
-rw-r--r--library/cpp/actors/core/ya.make2
30 files changed, 526 insertions, 526 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 638bfb72fa..6f9ba6a42b 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -37,10 +37,10 @@ namespace NActors {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
- void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
- TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
- }
-
+ void TActivationContext::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
+ }
+
void TActivationContext::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie);
}
@@ -87,14 +87,14 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.ActorSystem;
}
- i64 TActivationContext::GetCurrentEventTicks() {
+ i64 TActivationContext::GetCurrentEventTicks() {
return GetCycleCountFast() - TlsActivationContext->EventStart;
- }
-
- double TActivationContext::GetCurrentEventTicksAsSeconds() {
- return NHPTimer::GetSeconds(GetCurrentEventTicks());
- }
-
+ }
+
+ double TActivationContext::GetCurrentEventTicksAsSeconds() {
+ return NHPTimer::GetSeconds(GetCurrentEventTicks());
+ }
+
TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID);
}
@@ -107,10 +107,10 @@ namespace NActors {
ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
- void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
- ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
- }
-
+ void TActorContext::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
+ ExecutorThread.Schedule(deadline, new IEventHandle(SelfID, TActorId(), ev), cookie);
+ }
+
void TActorContext::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const {
ExecutorThread.Schedule(delta, new IEventHandle(SelfID, TActorId(), ev), cookie);
}
@@ -119,10 +119,10 @@ namespace NActors {
TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
- void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
- TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
- }
-
+ void IActor::Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
+ TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
+ }
+
void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
@@ -131,10 +131,10 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.ActorSystem->Timestamp();
}
- TMonotonic TActivationContext::Monotonic() {
- return TlsActivationContext->ExecutorThread.ActorSystem->Monotonic();
- }
-
+ TMonotonic TActivationContext::Monotonic() {
+ return TlsActivationContext->ExecutorThread.ActorSystem->Monotonic();
+ }
+
TInstant TActorContext::Now() const {
return ExecutorThread.ActorSystem->Timestamp();
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 9c167ab595..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -1,7 +1,7 @@
#pragma once
#include "event.h"
-#include "monotonic.h"
+#include "monotonic.h"
#include <util/system/tls.h>
#include <library/cpp/actors/util/local_process_key.h>
@@ -24,13 +24,13 @@ namespace NActors {
public:
TMailboxHeader& Mailbox;
TExecutorThread& ExecutorThread;
- const NHPTimer::STime EventStart;
+ const NHPTimer::STime EventStart;
protected:
- explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
+ explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
: Mailbox(mailbox)
, ExecutorThread(executorThread)
- , EventStart(eventStart)
+ , EventStart(eventStart)
{
}
@@ -40,22 +40,22 @@ namespace NActors {
/**
* Schedule one-shot event that will be send at given time point in the future.
*
- * @param deadline the wallclock time point in future when event must be send
+ * @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
-
- /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+
+ /**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
@@ -65,7 +65,7 @@ namespace NActors {
static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
static TInstant Now();
- static TMonotonic Monotonic();
+ static TMonotonic Monotonic();
NLog::TSettings* LoggerSettings() const;
// register new actor in ActorSystem on new fresh mailbox.
@@ -83,16 +83,16 @@ namespace NActors {
static TActorId InterconnectProxy(ui32 nodeid);
static TActorSystem* ActorSystem();
-
- static i64 GetCurrentEventTicks();
- static double GetCurrentEventTicksAsSeconds();
+
+ static i64 GetCurrentEventTicks();
+ static double GetCurrentEventTicksAsSeconds();
};
struct TActorContext: public TActivationContext {
const TActorId SelfID;
explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
- : TActivationContext(mailbox, executorThread, eventStart)
+ : TActivationContext(mailbox, executorThread, eventStart)
, SelfID(selfID)
{
}
@@ -110,22 +110,22 @@ namespace NActors {
/**
* Schedule one-shot event that will be send at given time point in the future.
*
- * @param deadline the wallclock time point in future when event must be send
+ * @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
-
- /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
+
+ /**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
@@ -135,7 +135,7 @@ namespace NActors {
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
TActorContext MakeFor(const TActorId& otherId) const {
- return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
+ return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
}
// register new actor in ActorSystem on new fresh mailbox.
@@ -179,22 +179,22 @@ namespace NActors {
/**
* Schedule one-shot event that will be send at given time point in the future.
*
- * @param deadline the wallclock time point in future when event must be send
+ * @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
virtual void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
-
- /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ virtual void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
+
+ /**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
@@ -240,11 +240,11 @@ namespace NActors {
INTERCONNECT_SESSION_KILLER = 286,
ACTOR_SYSTEM_SCHEDULER_ACTOR = 312,
ACTOR_FUTURE_CALLBACK = 337,
- INTERCONNECT_MONACTOR = 362,
+ INTERCONNECT_MONACTOR = 362,
INTERCONNECT_LOAD_ACTOR = 376,
INTERCONNECT_LOAD_RESPONDER = 377,
NAMESERVICE = 450,
- DNS_RESOLVER = 481,
+ DNS_RESOLVER = 481,
INTERCONNECT_PROXY_WRAPPER = 546,
};
@@ -362,7 +362,7 @@ namespace NActors {
}
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
- void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
+ void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
// register new actor in ActorSystem on new fresh mailbox.
@@ -456,7 +456,7 @@ namespace NActors {
inline TActorContext TActivationContext::ActorContextFor(TActorId id) {
auto& tls = *TlsActivationContext;
- return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
+ return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
}
class TDecorator : public IActor {
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index d3bcdafbd3..0ab4d2b24d 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -1,9 +1,9 @@
#include "actor_coroutine.h"
#include "executor_thread.h"
-#include <util/system/sanitizers.h>
+#include <util/system/sanitizers.h>
#include <util/system/type_name.h>
-
+
namespace NActors {
static constexpr size_t StackOverflowGap = 4096;
static char GoodStack[StackOverflowGap];
@@ -92,8 +92,8 @@ namespace NActors {
}
// prepare actor context for in-coroutine use
- TActivationContext *ac = TlsActivationContext;
- TlsActivationContext = nullptr;
+ TActivationContext *ac = TlsActivationContext;
+ TlsActivationContext = nullptr;
TActorContext ctx(ac->Mailbox, ac->ExecutorThread, ac->EventStart, SelfActorId);
ActorContext = &ctx;
diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h
index c9e6173173..d972b1a0ff 100644
--- a/library/cpp/actors/core/actorid.h
+++ b/library/cpp/actors/core/actorid.h
@@ -12,13 +12,13 @@ namespace NActors {
// next 20 bits - node id itself
struct TActorId {
- static constexpr ui32 MaxServiceIDLength = 12;
- static constexpr ui32 MaxPoolID = 0x000007FF;
- static constexpr ui32 MaxNodeId = 0x000FFFFF;
- static constexpr ui32 PoolIndexShift = 20;
- static constexpr ui32 PoolIndexMask = MaxPoolID << PoolIndexShift;
- static constexpr ui32 ServiceMask = 0x80000000;
- static constexpr ui32 NodeIdMask = MaxNodeId;
+ static constexpr ui32 MaxServiceIDLength = 12;
+ static constexpr ui32 MaxPoolID = 0x000007FF;
+ static constexpr ui32 MaxNodeId = 0x000FFFFF;
+ static constexpr ui32 PoolIndexShift = 20;
+ static constexpr ui32 PoolIndexMask = MaxPoolID << PoolIndexShift;
+ static constexpr ui32 ServiceMask = 0x80000000;
+ static constexpr ui32 NodeIdMask = MaxNodeId;
private:
union {
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index 4130b0d4da..c58698a206 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -43,7 +43,7 @@ namespace NActors {
, Scheduler(setup->Scheduler)
, InterconnectCount((ui32)setup->Interconnect.ProxyActors.size())
, CurrentTimestamp(0)
- , CurrentMonotonic(0)
+ , CurrentMonotonic(0)
, CurrentIDCounter(RandomNumber<ui64>())
, SystemSetup(setup.Release())
, DefSelfID(NodeId, "actorsystem")
@@ -69,15 +69,15 @@ namespace NActors {
#endif
TActorId recipient = ev->GetRecipientRewrite();
- const ui32 recpNodeId = recipient.NodeId();
+ const ui32 recpNodeId = recipient.NodeId();
if (recpNodeId != NodeId && recpNodeId != 0) {
// if recipient is not local one - rewrite with forward instruction
Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
- Y_VERIFY(ev->Recipient == recipient,
- "Event rewrite from %s to %s would be lost via interconnect",
- ev->Recipient.ToString().c_str(),
- recipient.ToString().c_str());
+ Y_VERIFY(ev->Recipient == recipient,
+ "Event rewrite from %s to %s would be lost via interconnect",
+ ev->Recipient.ToString().c_str(),
+ recipient.ToString().c_str());
recipient = InterconnectProxy(recpNodeId);
ev->Rewrite(TEvInterconnect::EvForward, recipient);
}
@@ -119,20 +119,20 @@ namespace NActors {
}
void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
- Schedule(deadline - Timestamp(), ev, cookie);
+ Schedule(deadline - Timestamp(), ev, cookie);
+ }
+
+ void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
+ const auto current = Monotonic();
+ if (deadline < current)
+ deadline = current;
+
+ TTicketLock::TGuard guard(&ScheduleLock);
+ ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
- void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
- const auto current = Monotonic();
- if (deadline < current)
- deadline = current;
-
- TTicketLock::TGuard guard(&ScheduleLock);
- ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
- const auto deadline = Monotonic() + delta;
+ const auto deadline = Monotonic() + delta;
TTicketLock::TGuard guard(&ScheduleLock);
ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
@@ -211,7 +211,7 @@ namespace NActors {
TVector<NSchedulerQueue::TReader*> scheduleReaders;
scheduleReaders.push_back(&ScheduleQueue->Reader);
CpuManager->PrepareStart(scheduleReaders, this);
- Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic);
+ Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic);
Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size());
// setup interconnect proxies
@@ -242,9 +242,9 @@ namespace NActors {
// ok, setup complete, we could destroy setup config
SystemSetup.Destroy();
- Scheduler->PrepareStart();
+ Scheduler->PrepareStart();
CpuManager->Start();
- Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic));
+ Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic));
Scheduler->Start();
}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 80d33901b0..40499d7586 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -27,7 +27,7 @@ namespace NActors {
char data[12];
memcpy(data, "ICProxy@", 8);
memcpy(data + 8, &destNodeId, sizeof(ui32));
- return TActorId(0, TStringBuf(data, 12));
+ return TActorId(0, TStringBuf(data, 12));
}
inline bool IsInterconnectProxyId(const TActorId& actorId) {
@@ -69,7 +69,7 @@ namespace NActors {
/**
* Schedule one-shot event that will be send at given time point in the future.
*
- * @param deadline the wallclock time point in future when event must be send
+ * @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
* @param workerId index of thread which will perform event dispatching
@@ -77,16 +77,16 @@ namespace NActors {
virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
* @param workerId index of thread which will perform event dispatching
- */
+ */
virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
- /**
+
+ /**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
@@ -136,9 +136,9 @@ namespace NActors {
virtual ~ISchedulerThread() {
}
- virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0;
+ virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0;
virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0;
- virtual void PrepareStart() { /* empty */ }
+ virtual void PrepareStart() { /* empty */ }
virtual void Start() = 0;
virtual void PrepareStop() = 0;
virtual void Stop() = 0;
@@ -226,7 +226,7 @@ namespace NActors {
TArrayHolder<TActorId> Interconnect;
volatile ui64 CurrentTimestamp;
- volatile ui64 CurrentMonotonic;
+ volatile ui64 CurrentMonotonic;
volatile ui64 CurrentIDCounter;
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
@@ -264,22 +264,22 @@ namespace NActors {
/**
* Schedule one-shot event that will be send at given time point in the future.
*
- * @param deadline the wallclock time point in future when event must be send
+ * @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
/**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- */
- void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
-
- /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ */
+ void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
+
+ /**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
@@ -340,10 +340,10 @@ namespace NActors {
return TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp));
}
- TMonotonic Monotonic() const {
- return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic));
- }
-
+ TMonotonic Monotonic() const {
+ return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic));
+ }
+
template <typename T>
T* AppData() const {
return (T*)AppData0;
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 381dac03af..702cf50fad 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -4,7 +4,7 @@
#include "event_pb.h"
#include <library/cpp/actors/protos/actors.pb.h>
-#include <util/system/unaligned_mem.h>
+#include <util/system/unaligned_mem.h>
namespace NActors {
struct TEvents {
@@ -161,9 +161,9 @@ namespace NActors {
private:
static TString MakeData(ui32 sourceType, ui32 reason) {
TString s = TString::Uninitialized(sizeof(ui32) + sizeof(ui32));
- char *p = s.Detach();
- WriteUnaligned<ui32>(p + 0, sourceType);
- WriteUnaligned<ui32>(p + 4, reason);
+ char *p = s.Detach();
+ WriteUnaligned<ui32>(p + 0, sourceType);
+ WriteUnaligned<ui32>(p + 4, reason);
return s;
}
};
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index 3b96625cab..23deaffd10 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -32,9 +32,9 @@ namespace NActors {
IEventBase* TEvents::TEvUndelivered::Load(TEventSerializedData* bufs) {
TString str = bufs->GetString();
Y_VERIFY(str.size() == (sizeof(ui32) + sizeof(ui32)));
- const char* p = str.data();
- const ui64 sourceType = ReadUnaligned<ui32>(p + 0);
- const ui64 reason = ReadUnaligned<ui32>(p + 4);
+ const char* p = str.data();
+ const ui64 sourceType = ReadUnaligned<ui32>(p + 0);
+ const ui64 reason = ReadUnaligned<ui32>(p + 4);
return new TEvUndelivered(sourceType, reason);
}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 87315a6a6a..4dce16939a 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -87,17 +87,17 @@ namespace NActors {
if (x < 0) {
#if defined ACTORSLIB_COLLECT_EXEC_STATS
if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
- // Initially counter contains -t0, the pool start timestamp
- // When the first thread goes to sleep we add t1, so the counter
- // becomes t1-t0 >= 0, or the duration of max utilization so far.
- // If the counter was negative and becomes positive, that means
- // counter just turned into a duration and we should store that
- // duration. Otherwise another thread raced with us and
- // subtracted some other timestamp t2.
+ // Initially counter contains -t0, the pool start timestamp
+ // When the first thread goes to sleep we add t1, so the counter
+ // becomes t1-t0 >= 0, or the duration of max utilization so far.
+ // If the counter was negative and becomes positive, that means
+ // counter just turned into a duration and we should store that
+ // duration. Otherwise another thread raced with us and
+ // subtracted some other timestamp t2.
const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
- if (x < 0 && x + t > 0)
- AtomicStore(&MaxUtilizationAccumulator, x + t);
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
+ if (x < 0 && x + t > 0)
+ AtomicStore(&MaxUtilizationAccumulator, x + t);
}
#endif
@@ -126,7 +126,7 @@ namespace NActors {
if (!doSpin) {
break;
}
- if (RelaxedLoad(&StopFlag)) {
+ if (RelaxedLoad(&StopFlag)) {
break;
}
}
@@ -159,20 +159,20 @@ namespace NActors {
#if defined ACTORSLIB_COLLECT_EXEC_STATS
if (AtomicDecrement(ThreadUtilization) == 0) {
- // When we started sleeping counter contained t1-t0, or the
- // last duration of max utilization. Now we subtract t2 >= t1,
- // which turns counter negative again, and the next sleep cycle
- // at timestamp t3 would be adding some new duration t3-t2.
- // If the counter was positive and becomes negative that means
- // there are no current races with other threads and we should
- // store the last positive duration we observed. Multiple
- // threads may be adding and subtracting values in potentially
- // arbitrary order, which would cause counter to oscillate
- // around zero. When it crosses zero is a good indication of a
- // correct value.
+ // When we started sleeping counter contained t1-t0, or the
+ // last duration of max utilization. Now we subtract t2 >= t1,
+ // which turns counter negative again, and the next sleep cycle
+ // at timestamp t3 would be adding some new duration t3-t2.
+ // If the counter was positive and becomes negative that means
+ // there are no current races with other threads and we should
+ // store the last positive duration we observed. Multiple
+ // threads may be adding and subtracting values in potentially
+ // arbitrary order, which would cause counter to oscillate
+ // around zero. When it crosses zero is a good indication of a
+ // correct value.
const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
- if (x > 0 && x - t < 0)
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
+ if (x > 0 && x - t < 0)
AtomicStore(&MaxUtilizationAccumulator, x);
}
#endif
@@ -305,18 +305,18 @@ namespace NActors {
void TBasicExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_VERIFY_DEBUG(workerId < PoolThreads);
-
- const auto current = ActorSystem->Monotonic();
- if (deadline < current)
- deadline = current;
-
+
+ const auto current = ActorSystem->Monotonic();
+ if (deadline < current)
+ deadline = current;
+
ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
+ }
+
void TBasicExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_VERIFY_DEBUG(workerId < PoolThreads);
- const auto deadline = ActorSystem->Monotonic() + delta;
+ const auto deadline = ActorSystem->Monotonic() + delta;
ScheduleWriters[workerId].Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index b3d6d8b329..76dff693af 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -47,8 +47,8 @@ public:
if (GetCounter() == 0) {
break;
}
-
- Sleep(TDuration::MilliSeconds(1));
+
+ Sleep(TDuration::MilliSeconds(1));
}
}
@@ -69,8 +69,8 @@ private:
{
Y_UNUSED(ev);
Action();
- TAtomicBase count = AtomicDecrement(Counter);
- Y_VERIFY(count != Max<TAtomicBase>());
+ TAtomicBase count = AtomicDecrement(Counter);
+ Y_VERIFY(count != Max<TAtomicBase>());
if (count) {
Send(Receiver, new TEvMsg());
}
@@ -206,19 +206,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
actorSystem.Send(changerActorId, new TEvMsg());
while (true) {
- size_t maxCounter = 0;
+ size_t maxCounter = 0;
for (size_t i = 0; i < size; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ }
+
+ if (maxCounter == 0) {
+ break;
}
- if (maxCounter == 0) {
- break;
- }
-
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
changerActor->Stop();
@@ -242,9 +242,9 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
while (actor->GetCounter()) {
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
-
- Sleep(TDuration::MilliSeconds(1));
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
+
+ Sleep(TDuration::MilliSeconds(1));
}
}
@@ -275,19 +275,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
while (true) {
- size_t maxCounter = 0;
+ size_t maxCounter = 0;
for (size_t i = 0; i < size; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ }
+
+ if (maxCounter == 0) {
+ break;
}
- if (maxCounter == 0) {
- break;
- }
-
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
}
@@ -319,19 +319,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
while (true) {
- size_t maxCounter = 0;
+ size_t maxCounter = 0;
for (size_t i = 0; i < actorsCount; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ }
+
+ if (maxCounter == 0) {
+ break;
}
- if (maxCounter == 0) {
- break;
- }
-
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
}
@@ -362,19 +362,19 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
}
while (true) {
- size_t maxCounter = 0;
+ size_t maxCounter = 0;
for (size_t i = 0; i < actorsCount; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ maxCounter = Max(maxCounter, actors[i]->GetCounter());
+ }
+
+ if (maxCounter == 0) {
+ break;
}
- if (maxCounter == 0) {
- break;
- }
-
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
}
diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp
index f4f13c9c20..fb557ae6b0 100644
--- a/library/cpp/actors/core/executor_pool_io.cpp
+++ b/library/cpp/actors/core/executor_pool_io.cpp
@@ -30,33 +30,33 @@ namespace NActors {
ui32 workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
- NHPTimer::STime elapsed = 0;
- NHPTimer::STime parked = 0;
+ NHPTimer::STime elapsed = 0;
+ NHPTimer::STime parked = 0;
NHPTimer::STime hpstart = GetCycleCountFast();
- NHPTimer::STime hpnow;
-
+ NHPTimer::STime hpnow;
+
const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
TThreadCtx& threadCtx = Threads[workerId];
ThreadQueue.Push(workerId + 1, revolvingCounter);
hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
+ elapsed += hpnow - hpstart;
if (threadCtx.Pad.Park())
return 0;
hpstart = GetCycleCountFast();
- parked += hpstart - hpnow;
+ parked += hpstart - hpnow;
}
while (!RelaxedLoad(&StopFlag)) {
- if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
+ if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
+ elapsed += hpnow - hpstart;
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed);
- if (parked > 0) {
+ if (parked > 0) {
wctx.AddParkedCycles(parked);
- }
+ }
return activation;
- }
+ }
SpinLockPause();
}
@@ -69,18 +69,18 @@ namespace NActors {
void TIOExecutorPool::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
-
- const auto current = ActorSystem->Monotonic();
- if (deadline < current)
- deadline = current;
-
- TTicketLock::TGuard guard(&ScheduleLock);
- ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
-
+
+ const auto current = ActorSystem->Monotonic();
+ if (deadline < current)
+ deadline = current;
+
+ TTicketLock::TGuard guard(&ScheduleLock);
+ ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
+ }
+
void TIOExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
- const auto deadline = ActorSystem->Monotonic() + delta;
+ const auto deadline = ActorSystem->Monotonic() + delta;
TTicketLock::TGuard guard(&ScheduleLock);
ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h
index 7de6fd0528..e576d642a1 100644
--- a/library/cpp/actors/core/executor_pool_io.h
+++ b/library/cpp/actors/core/executor_pool_io.h
@@ -17,7 +17,7 @@ namespace NActors {
};
TArrayHolder<TThreadCtx> Threads;
- TUnorderedCache<ui32, 512, 4> ThreadQueue;
+ TUnorderedCache<ui32, 512, 4> ThreadQueue;
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
TTicketLock ScheduleLock;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index 4ae9ec4c73..dac6245635 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -1235,7 +1235,7 @@ namespace NActors {
inline void TUnitedWorkers::TryWake(TPoolId pool) {
// Avoid using multiple atomic seq_cst loads in cycle, use barrier once
AtomicBarrier();
-
+
// Scan every allowed cpu in pool's wakeup order and try to wake the first idle cpu
if (RelaxedLoad(&Pools[pool].Waiters) > 0) {
for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
@@ -1247,11 +1247,11 @@ namespace NActors {
// Cpu has not been woken up
}
-
+
inline void TUnitedWorkers::BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) {
Pools[pool].BeginExecution(activation, revolvingCounter);
- }
-
+ }
+
inline bool TUnitedWorkers::NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter) {
return Pools[pool].NextExecution(activation, revolvingCounter);
}
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index ddc4c348ee..d4df17f1b8 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -58,8 +58,8 @@ public:
if (GetCounter() == 0) {
break;
}
-
- Sleep(TDuration::MilliSeconds(1));
+
+ Sleep(TDuration::MilliSeconds(1));
}
}
@@ -78,8 +78,8 @@ private:
void Handle(TEvMsg::TPtr &ev) {
Y_UNUSED(ev);
Action();
- TAtomicBase count = AtomicDecrement(Counter);
- Y_VERIFY(count != Max<TAtomicBase>());
+ TAtomicBase count = AtomicDecrement(Counter);
+ Y_VERIFY(count != Max<TAtomicBase>());
if (count) {
Send(Receiver, new TEvMsg());
}
@@ -149,9 +149,9 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
while (actor->GetCounter()) {
auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
-
- Sleep(TDuration::MilliSeconds(1));
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
+
+ Sleep(TDuration::MilliSeconds(1));
}
TVector<TExecutorThreadStats> stats;
@@ -212,11 +212,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
left += actor->GetCounter();
}
if (left == 0) {
- break;
- }
+ break;
+ }
auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
for (size_t pool = 0; pool < pools; pool++) {
@@ -311,11 +311,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
left += actor->GetCounter();
}
if (left == 0) {
- break;
- }
+ break;
+ }
auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left);
- Sleep(TDuration::MilliSeconds(1));
+ Sleep(TDuration::MilliSeconds(1));
}
for (size_t pool = 0; pool < pools; pool++) {
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 7e961584e6..446b651efd 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -76,11 +76,11 @@ namespace NActors {
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
}
- void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
- ++CurrentActorScheduledEventsCounter;
+ void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ ++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
- }
-
+ }
+
void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId);
@@ -145,7 +145,7 @@ namespace NActors {
NHPTimer::STime hpnow;
recipient = ev->GetRecipientRewrite();
if (actor = mailbox->FindActor(recipient.LocalId())) {
- TActorContext ctx(*mailbox, *this, hpprev, recipient);
+ TActorContext ctx(*mailbox, *this, hpprev, recipient);
TlsActivationContext = &ctx;
#ifdef USE_ACTOR_CALLSTACK
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 72b8f28c1d..9d3c573f0d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -47,7 +47,7 @@ namespace NActors {
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
- void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+ void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
bool Send(TAutoPtr<IEventHandle> ev) {
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 391844dcff..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -223,9 +223,9 @@ namespace NActors {
}
}
- void TLoggerActor::Throttle(const NLog::TSettings& settings) {
+ void TLoggerActor::Throttle(const NLog::TSettings& settings) {
if (AtomicGet(IsOverflow))
- Sleep(settings.ThrottleDelay);
+ Sleep(settings.ThrottleDelay);
}
void TLoggerActor::LogIgnoredCount(TInstant now) {
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index f063642ad2..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -232,7 +232,7 @@ namespace NActors {
// Directly call logger instead of sending a message
void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...);
- static void Throttle(const NLog::TSettings& settings);
+ static void Throttle(const NLog::TSettings& settings);
private:
TIntrusivePtr<NLog::TSettings> Settings;
@@ -322,7 +322,7 @@ namespace NActors {
inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str)
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
- TLoggerActor::Throttle(*mSettings);
+ TLoggerActor::Throttle(*mSettings);
ctx.Send(new IEventHandle(mSettings->LoggerActorId, TActorId(), new NLog::TEvLog(mPriority, mComponent, std::move(str))));
}
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index d77688021d..f52f2fc5d2 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -12,7 +12,7 @@ namespace NActors {
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
, AllowDrop(true)
- , ThrottleDelay(TDuration::MilliSeconds(100))
+ , ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
, MaxVal(0)
, Mask(0)
@@ -34,7 +34,7 @@ namespace NActors {
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
, AllowDrop(true)
- , ThrottleDelay(TDuration::MilliSeconds(100))
+ , ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
, MaxVal(0)
, Mask(0)
@@ -205,10 +205,10 @@ namespace NActors {
AllowDrop = val;
}
- void TSettings::SetThrottleDelay(TDuration value) {
- ThrottleDelay = value;
- }
-
+ void TSettings::SetThrottleDelay(TDuration value) {
+ ThrottleDelay = value;
+ }
+
void TSettings::SetUseLocalTimestamps(bool value) {
UseLocalTimestamps = value;
}
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 5f4898bda1..7fe4504edd 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -73,7 +73,7 @@ namespace NActors {
EComponent LoggerComponent;
ui64 TimeThresholdMs;
bool AllowDrop;
- TDuration ThrottleDelay;
+ TDuration ThrottleDelay;
TArrayHolder<TAtomic> ComponentInfo;
TVector<TString> ComponentNames;
EComponent MinVal;
@@ -162,7 +162,7 @@ namespace NActors {
static bool IsValidPriority(EPriority priority);
bool IsValidComponent(EComponent component);
void SetAllowDrop(bool val);
- void SetThrottleDelay(TDuration value);
+ void SetThrottleDelay(TDuration value);
void SetUseLocalTimestamps(bool value);
private:
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index b63577c7d6..d84b4f9e46 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -529,7 +529,7 @@ namespace NActors {
Y_FAIL();
}
- AtomicStore(Lines + lineIndex, header);
+ AtomicStore(Lines + lineIndex, header);
ui32 ret = lineIndexMask | 1;
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 8f4f901bd9..0bd9c4d314 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -277,7 +277,7 @@ namespace NActors {
TAtomic LastAllocatedLine;
TAtomic AllocatedMailboxCount;
- typedef TUnorderedCache<ui32, 512, 4> TMailboxCache;
+ typedef TUnorderedCache<ui32, 512, 4> TMailboxCache;
TMailboxCache MailboxCacheSimple;
TAtomic CachedSimpleMailboxes;
TMailboxCache MailboxCacheRevolving;
diff --git a/library/cpp/actors/core/monotonic.cpp b/library/cpp/actors/core/monotonic.cpp
index eefd8913cc..3465149dbe 100644
--- a/library/cpp/actors/core/monotonic.cpp
+++ b/library/cpp/actors/core/monotonic.cpp
@@ -1,23 +1,23 @@
-#include "monotonic.h"
-
-#include <chrono>
-
-namespace NActors {
-
- namespace {
- // Unfortunately time_since_epoch() is sometimes negative on wine
- // Remember initial time point at program start and use offsets from that
- std::chrono::steady_clock::time_point MonotonicOffset = std::chrono::steady_clock::now();
- }
-
- ui64 GetMonotonicMicroSeconds() {
- auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - MonotonicOffset).count();
- // Steady clock is supposed to never jump backwards, but it's better to be safe in case of buggy implementations
- if (Y_UNLIKELY(microseconds < 0)) {
- microseconds = 0;
- }
- // Add one so we never return zero
- return microseconds + 1;
- }
-
-} // namespace NActors
+#include "monotonic.h"
+
+#include <chrono>
+
+namespace NActors {
+
+ namespace {
+ // Unfortunately time_since_epoch() is sometimes negative on wine
+ // Remember initial time point at program start and use offsets from that
+ std::chrono::steady_clock::time_point MonotonicOffset = std::chrono::steady_clock::now();
+ }
+
+ ui64 GetMonotonicMicroSeconds() {
+ auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - MonotonicOffset).count();
+ // Steady clock is supposed to never jump backwards, but it's better to be safe in case of buggy implementations
+ if (Y_UNLIKELY(microseconds < 0)) {
+ microseconds = 0;
+ }
+ // Add one so we never return zero
+ return microseconds + 1;
+ }
+
+} // namespace NActors
diff --git a/library/cpp/actors/core/monotonic.h b/library/cpp/actors/core/monotonic.h
index cc0136b558..6fceb91dbe 100644
--- a/library/cpp/actors/core/monotonic.h
+++ b/library/cpp/actors/core/monotonic.h
@@ -1,111 +1,111 @@
-#pragma once
-
-#include <util/datetime/base.h>
-
-namespace NActors {
-
- /**
- * Returns current monotonic time in microseconds
- */
- ui64 GetMonotonicMicroSeconds();
-
- /**
- * Similar to TInstant, but measuring monotonic time
- */
- class TMonotonic : public TTimeBase<TMonotonic> {
- using TBase = TTimeBase<TMonotonic>;
-
- private:
- constexpr explicit TMonotonic(TValue value) noexcept
- : TBase(value)
- { }
-
- public:
- constexpr TMonotonic() noexcept {
- }
-
- static constexpr TMonotonic FromValue(TValue value) noexcept {
- return TMonotonic(value);
- }
-
- static inline TMonotonic Now() {
- return TMonotonic::MicroSeconds(GetMonotonicMicroSeconds());
- }
-
- using TBase::Days;
- using TBase::Hours;
- using TBase::MicroSeconds;
- using TBase::MilliSeconds;
- using TBase::Minutes;
- using TBase::Seconds;
-
- static constexpr TMonotonic Max() noexcept {
- return TMonotonic(::Max<ui64>());
- }
-
- static constexpr TMonotonic Zero() noexcept {
- return TMonotonic();
- }
-
- static constexpr TMonotonic MicroSeconds(ui64 us) noexcept {
- return TMonotonic(TInstant::MicroSeconds(us).GetValue());
- }
-
- static constexpr TMonotonic MilliSeconds(ui64 ms) noexcept {
- return TMonotonic(TInstant::MilliSeconds(ms).GetValue());
- }
-
- static constexpr TMonotonic Seconds(ui64 s) noexcept {
- return TMonotonic(TInstant::Seconds(s).GetValue());
- }
-
- static constexpr TMonotonic Minutes(ui64 m) noexcept {
- return TMonotonic(TInstant::Minutes(m).GetValue());
- }
-
- static constexpr TMonotonic Hours(ui64 h) noexcept {
- return TMonotonic(TInstant::Hours(h).GetValue());
- }
-
- static constexpr TMonotonic Days(ui64 d) noexcept {
- return TMonotonic(TInstant::Days(d).GetValue());
- }
-
- template<class T>
- inline TMonotonic& operator+=(const T& t) noexcept {
- return (*this = (*this + t));
- }
-
- template<class T>
- inline TMonotonic& operator-=(const T& t) noexcept {
- return (*this = (*this - t));
- }
- };
-} // namespace NActors
-
-Y_DECLARE_PODTYPE(NActors::TMonotonic);
-
-template<>
-struct THash<NActors::TMonotonic> {
- size_t operator()(const NActors::TMonotonic& key) const {
- return THash<NActors::TMonotonic::TValue>()(key.GetValue());
- }
-};
-
-namespace NActors {
-
- constexpr TDuration operator-(const TMonotonic& l, const TMonotonic& r) {
- return TInstant::FromValue(l.GetValue()) - TInstant::FromValue(r.GetValue());
- }
-
- constexpr TMonotonic operator+(const TMonotonic& l, const TDuration& r) {
- TInstant result = TInstant::FromValue(l.GetValue()) + r;
- return TMonotonic::FromValue(result.GetValue());
- }
-
- constexpr TMonotonic operator-(const TMonotonic& l, const TDuration& r) {
- TInstant result = TInstant::FromValue(l.GetValue()) - r;
- return TMonotonic::FromValue(result.GetValue());
- }
-
-} // namespace NActors
+#pragma once
+
+#include <util/datetime/base.h>
+
+namespace NActors {
+
+ /**
+ * Returns current monotonic time in microseconds
+ */
+ ui64 GetMonotonicMicroSeconds();
+
+ /**
+ * Similar to TInstant, but measuring monotonic time
+ */
+ class TMonotonic : public TTimeBase<TMonotonic> {
+ using TBase = TTimeBase<TMonotonic>;
+
+ private:
+ constexpr explicit TMonotonic(TValue value) noexcept
+ : TBase(value)
+ { }
+
+ public:
+ constexpr TMonotonic() noexcept {
+ }
+
+ static constexpr TMonotonic FromValue(TValue value) noexcept {
+ return TMonotonic(value);
+ }
+
+ static inline TMonotonic Now() {
+ return TMonotonic::MicroSeconds(GetMonotonicMicroSeconds());
+ }
+
+ using TBase::Days;
+ using TBase::Hours;
+ using TBase::MicroSeconds;
+ using TBase::MilliSeconds;
+ using TBase::Minutes;
+ using TBase::Seconds;
+
+ static constexpr TMonotonic Max() noexcept {
+ return TMonotonic(::Max<ui64>());
+ }
+
+ static constexpr TMonotonic Zero() noexcept {
+ return TMonotonic();
+ }
+
+ static constexpr TMonotonic MicroSeconds(ui64 us) noexcept {
+ return TMonotonic(TInstant::MicroSeconds(us).GetValue());
+ }
+
+ static constexpr TMonotonic MilliSeconds(ui64 ms) noexcept {
+ return TMonotonic(TInstant::MilliSeconds(ms).GetValue());
+ }
+
+ static constexpr TMonotonic Seconds(ui64 s) noexcept {
+ return TMonotonic(TInstant::Seconds(s).GetValue());
+ }
+
+ static constexpr TMonotonic Minutes(ui64 m) noexcept {
+ return TMonotonic(TInstant::Minutes(m).GetValue());
+ }
+
+ static constexpr TMonotonic Hours(ui64 h) noexcept {
+ return TMonotonic(TInstant::Hours(h).GetValue());
+ }
+
+ static constexpr TMonotonic Days(ui64 d) noexcept {
+ return TMonotonic(TInstant::Days(d).GetValue());
+ }
+
+ template<class T>
+ inline TMonotonic& operator+=(const T& t) noexcept {
+ return (*this = (*this + t));
+ }
+
+ template<class T>
+ inline TMonotonic& operator-=(const T& t) noexcept {
+ return (*this = (*this - t));
+ }
+ };
+} // namespace NActors
+
+Y_DECLARE_PODTYPE(NActors::TMonotonic);
+
+template<>
+struct THash<NActors::TMonotonic> {
+ size_t operator()(const NActors::TMonotonic& key) const {
+ return THash<NActors::TMonotonic::TValue>()(key.GetValue());
+ }
+};
+
+namespace NActors {
+
+ constexpr TDuration operator-(const TMonotonic& l, const TMonotonic& r) {
+ return TInstant::FromValue(l.GetValue()) - TInstant::FromValue(r.GetValue());
+ }
+
+ constexpr TMonotonic operator+(const TMonotonic& l, const TDuration& r) {
+ TInstant result = TInstant::FromValue(l.GetValue()) + r;
+ return TMonotonic::FromValue(result.GetValue());
+ }
+
+ constexpr TMonotonic operator-(const TMonotonic& l, const TDuration& r) {
+ TInstant result = TInstant::FromValue(l.GetValue()) - r;
+ return TMonotonic::FromValue(result.GetValue());
+ }
+
+} // namespace NActors
diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp
index c189653e99..febc5e40dd 100644
--- a/library/cpp/actors/core/scheduler_actor.cpp
+++ b/library/cpp/actors/core/scheduler_actor.cpp
@@ -43,7 +43,7 @@ namespace NActors {
TPollerToken::TPtr PollerToken;
ui64 RealTime;
- ui64 MonotonicTime;
+ ui64 MonotonicTime;
ui64 ActiveTick;
typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues
@@ -56,7 +56,7 @@ namespace NActors {
static const ui64 IntrasecondThreshold = 1048576; // ~second
TAutoPtr<TMomentMap> ActiveSec;
volatile ui64* CurrentTimestamp = nullptr;
- volatile ui64* CurrentMonotonic = nullptr;
+ volatile ui64* CurrentMonotonic = nullptr;
TDeque<TAutoPtr<IEventHandle>> EventsToBeSent;
public:
@@ -84,9 +84,9 @@ namespace NActors {
Y_ASSERT(evInitialize.CurrentTimestamp != nullptr);
CurrentTimestamp = evInitialize.CurrentTimestamp;
- Y_ASSERT(evInitialize.CurrentMonotonic != nullptr);
- CurrentMonotonic = evInitialize.CurrentMonotonic;
-
+ Y_ASSERT(evInitialize.CurrentMonotonic != nullptr);
+ CurrentMonotonic = evInitialize.CurrentMonotonic;
+
struct itimerspec new_time;
memset(&new_time, 0, sizeof(new_time));
new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000;
@@ -96,10 +96,10 @@ namespace NActors {
const bool success = ctx.Send(PollerActor, new TEvPollerRegister(TimerDescriptor, SelfId(), {}));
Y_VERIFY(success);
- RealTime = RelaxedLoad(CurrentTimestamp);
- MonotonicTime = RelaxedLoad(CurrentMonotonic);
+ RealTime = RelaxedLoad(CurrentTimestamp);
+ MonotonicTime = RelaxedLoad(CurrentMonotonic);
- ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold);
+ ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold);
}
void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx) {
@@ -108,10 +108,10 @@ namespace NActors {
}
void UpdateTime() {
- RealTime = TInstant::Now().MicroSeconds();
- MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds());
- AtomicStore(CurrentTimestamp, RealTime);
- AtomicStore(CurrentMonotonic, MonotonicTime);
+ RealTime = TInstant::Now().MicroSeconds();
+ MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds());
+ AtomicStore(CurrentTimestamp, RealTime);
+ AtomicStore(CurrentMonotonic, MonotonicTime);
}
void TryUpdateTime(NHPTimer::STime* lastTimeUpdate) {
diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h
index 4209db0ab6..c2c561b43d 100644
--- a/library/cpp/actors/core/scheduler_actor.h
+++ b/library/cpp/actors/core/scheduler_actor.h
@@ -9,12 +9,12 @@ namespace NActors {
struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> {
TVector<NSchedulerQueue::TReader*> ScheduleReaders;
volatile ui64* CurrentTimestamp;
- volatile ui64* CurrentMonotonic;
+ volatile ui64* CurrentMonotonic;
- TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic)
+ TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic)
: ScheduleReaders(scheduleReaders)
, CurrentTimestamp(currentTimestamp)
- , CurrentMonotonic(currentMonotonic)
+ , CurrentMonotonic(currentMonotonic)
{
}
};
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
index b0c80eb6d2..fba200e16b 100644
--- a/library/cpp/actors/core/scheduler_basic.cpp
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -9,35 +9,35 @@
#endif
namespace NActors {
-
- struct TBasicSchedulerThread::TMonCounters {
- NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
- NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
- NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
- NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
- NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
- NMonitoring::TDynamicCounters::TCounterPtr Iterations;
- NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
- NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
-
- TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
- : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
- , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
- , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
- , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
- , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
- , Iterations(counters->GetCounter("Scheduler/Iterations", true))
- , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
- , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
- { }
- };
-
+
+ struct TBasicSchedulerThread::TMonCounters {
+ NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
+ NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
+ NMonitoring::TDynamicCounters::TCounterPtr Iterations;
+ NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
+ NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+
+ TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
+ , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
+ , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
+ , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
+ , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
+ , Iterations(counters->GetCounter("Scheduler/Iterations", true))
+ , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
+ , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
+ { }
+ };
+
TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
: Config(config)
- , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
+ , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
, ActorSystem(nullptr)
, CurrentTimestamp(nullptr)
- , CurrentMonotonic(nullptr)
+ , CurrentMonotonic(nullptr)
, TotalReaders(0)
, StopFlag(false)
, ScheduleMap(3600)
@@ -55,45 +55,45 @@ namespace NActors {
#endif
::SetCurrentThreadName("Scheduler");
- ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
- ui64 throttledMonotonic = currentMonotonic;
+ ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
+ ui64 throttledMonotonic = currentMonotonic;
- ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
+ ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
TAutoPtr<TMomentMap> activeSec;
NHPTimer::STime hpprev = GetCycleCountFast();
- ui64 nextTimestamp = TInstant::Now().MicroSeconds();
- ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
-
+ ui64 nextTimestamp = TInstant::Now().MicroSeconds();
+ ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
while (!AtomicLoad(&StopFlag)) {
{
- const ui64 delta = nextMonotonic - throttledMonotonic;
- const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
- const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
-
- throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
-
- if (MonCounters) {
- *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
- }
+ const ui64 delta = nextMonotonic - throttledMonotonic;
+ const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
+ const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
+
+ throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
+
+ if (MonCounters) {
+ *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
+ }
+ }
+ AtomicStore(CurrentTimestamp, nextTimestamp);
+ AtomicStore(CurrentMonotonic, nextMonotonic);
+ currentMonotonic = nextMonotonic;
+
+ if (MonCounters) {
+ ++*MonCounters->Iterations;
}
- AtomicStore(CurrentTimestamp, nextTimestamp);
- AtomicStore(CurrentMonotonic, nextMonotonic);
- currentMonotonic = nextMonotonic;
-
- if (MonCounters) {
- ++*MonCounters->Iterations;
- }
-
+
bool somethingDone = false;
// first step - send everything triggered on schedule
- ui64 eventsSent = 0;
- ui64 eventsDropped = 0;
+ ui64 eventsSent = 0;
+ ui64 eventsDropped = 0;
for (;;) {
while (!!activeSec && !activeSec->empty()) {
TMomentMap::iterator it = activeSec->begin();
- if (it->first <= throttledMonotonic) {
+ if (it->first <= throttledMonotonic) {
if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
somethingDone = true;
@@ -102,16 +102,16 @@ namespace NActors {
ISchedulerCookie* cookie = x->Cookie;
// TODO: lazy send with backoff queue to not hang over contended mailboxes
if (cookie) {
- if (cookie->Detach()) {
+ if (cookie->Detach()) {
ActorSystem->Send(ev);
- ++eventsSent;
- } else {
+ ++eventsSent;
+ } else {
delete ev;
- ++eventsDropped;
- }
+ ++eventsDropped;
+ }
} else {
ActorSystem->Send(ev);
- ++eventsSent;
+ ++eventsSent;
}
}
}
@@ -120,7 +120,7 @@ namespace NActors {
break;
}
- if (activeTick <= throttledMonotonic) {
+ if (activeTick <= throttledMonotonic) {
Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
activeSec.Destroy();
activeTick += IntrasecondThreshold;
@@ -138,7 +138,7 @@ namespace NActors {
// second step - collect everything from queues
- ui64 eventsAdded = 0;
+ ui64 eventsAdded = 0;
for (ui32 i = 0; i != TotalReaders; ++i) {
while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
somethingDone = true;
@@ -165,57 +165,57 @@ namespace NActors {
queue.Reset(new NSchedulerQueue::TQueueType());
queue->Writer.Push(instant, ev, cookie);
}
-
- ++eventsAdded;
+
+ ++eventsAdded;
}
}
NHPTimer::STime hpnow = GetCycleCountFast();
-
- if (MonCounters) {
- *MonCounters->QueueSize -= eventsSent + eventsDropped;
- *MonCounters->QueueSize += eventsAdded;
- *MonCounters->EventsSent += eventsSent;
- *MonCounters->EventsDropped += eventsDropped;
- *MonCounters->EventsAdded += eventsAdded;
- *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
- }
-
- hpprev = hpnow;
- nextTimestamp = TInstant::Now().MicroSeconds();
- nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
-
+
+ if (MonCounters) {
+ *MonCounters->QueueSize -= eventsSent + eventsDropped;
+ *MonCounters->QueueSize += eventsAdded;
+ *MonCounters->EventsSent += eventsSent;
+ *MonCounters->EventsDropped += eventsDropped;
+ *MonCounters->EventsAdded += eventsAdded;
+ *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
+ }
+
+ hpprev = hpnow;
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
// ok complete, if nothing left - sleep
if (!somethingDone) {
- const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
- if (nextMonotonic >= nextInstant) // already in next time-slice
+ const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
+ if (nextMonotonic >= nextInstant) // already in next time-slice
continue;
- const ui64 delta = nextInstant - nextMonotonic;
+ const ui64 delta = nextInstant - nextMonotonic;
if (delta < Config.SpinThreshold) // not so much time left, just spin
continue;
- if (MonCounters) {
- ++*MonCounters->Sleeps;
- }
-
+ if (MonCounters) {
+ ++*MonCounters->Sleeps;
+ }
+
NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
-
- // Don't count sleep in elapsed microseconds
+
+ // Don't count sleep in elapsed microseconds
hpprev = GetCycleCountFast();
- nextTimestamp = TInstant::Now().MicroSeconds();
- nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
}
}
// ok, die!
}
- void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
+ void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
ActorSystem = actorSystem;
CurrentTimestamp = currentTimestamp;
- CurrentMonotonic = currentMonotonic;
- *CurrentTimestamp = TInstant::Now().MicroSeconds();
- *CurrentMonotonic = GetMonotonicMicroSeconds();
+ CurrentMonotonic = currentMonotonic;
+ *CurrentTimestamp = TInstant::Now().MicroSeconds();
+ *CurrentMonotonic = GetMonotonicMicroSeconds();
}
void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
@@ -225,16 +225,16 @@ namespace NActors {
Copy(readers, readers + scheduleReadersCount, Readers.Get());
}
- void TBasicSchedulerThread::PrepareStart() {
- // Called after actor system is initialized, but before executor threads
- // are started, giving us a chance to update current timestamp with a
- // more recent value, taking initialization time into account. This is
- // safe to do, since scheduler thread is not started yet, so no other
- // threads are updating time concurrently.
- AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
- AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
- }
-
+ void TBasicSchedulerThread::PrepareStart() {
+ // Called after actor system is initialized, but before executor threads
+ // are started, giving us a chance to update current timestamp with a
+ // more recent value, taking initialization time into account. This is
+ // safe to do, since scheduler thread is not started yet, so no other
+ // threads are updating time concurrently.
+ AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
+ AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
+ }
+
void TBasicSchedulerThread::Start() {
MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
}
diff --git a/library/cpp/actors/core/scheduler_basic.h b/library/cpp/actors/core/scheduler_basic.h
index 89ef8323f5..2ccde39235 100644
--- a/library/cpp/actors/core/scheduler_basic.h
+++ b/library/cpp/actors/core/scheduler_basic.h
@@ -1,7 +1,7 @@
#pragma once
#include "actorsystem.h"
-#include "monotonic.h"
+#include "monotonic.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/queue_chunk.h>
#include <library/cpp/threading/future/legacy_future.h>
@@ -9,17 +9,17 @@
#include <util/generic/map.h>
namespace NActors {
-
+
class TBasicSchedulerThread: public ISchedulerThread {
// TODO: replace with NUMA-local threads and per-thread schedules
const TSchedulerConfig Config;
- struct TMonCounters;
- const THolder<TMonCounters> MonCounters;
-
+ struct TMonCounters;
+ const THolder<TMonCounters> MonCounters;
+
TActorSystem* ActorSystem;
volatile ui64* CurrentTimestamp;
- volatile ui64* CurrentMonotonic;
+ volatile ui64* CurrentMonotonic;
ui32 TotalReaders;
TArrayHolder<NSchedulerQueue::TReader*> Readers;
@@ -44,7 +44,7 @@ namespace NActors {
void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override;
void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override;
- void PrepareStart() override;
+ void PrepareStart() override;
void Start() override;
void PrepareStop() override;
void Stop() override;
@@ -55,10 +55,10 @@ namespace NActors {
virtual ~TMockSchedulerThread() override {
}
- void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override {
+ void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override {
Y_UNUSED(actorSystem);
- *currentTimestamp = TInstant::Now().MicroSeconds();
- *currentMonotonic = GetMonotonicMicroSeconds();
+ *currentTimestamp = TInstant::Now().MicroSeconds();
+ *currentMonotonic = GetMonotonicMicroSeconds();
}
void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override {
diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h
index 51b597be1f..3b8fac28f0 100644
--- a/library/cpp/actors/core/scheduler_queue.h
+++ b/library/cpp/actors/core/scheduler_queue.h
@@ -76,10 +76,10 @@ namespace NActors {
}
void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
- if (Y_UNLIKELY(instantMicrosends == 0)) {
- // Protect against Pop() getting stuck forever
- instantMicrosends = 1;
- }
+ if (Y_UNLIKELY(instantMicrosends == 0)) {
+ // Protect against Pop() getting stuck forever
+ instantMicrosends = 1;
+ }
if (WritePosition != TChunk::EntriesCount) {
volatile TEntry& entry = WriteTo->Entries[WritePosition];
entry.Cookie = cookie;
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 25fc0ce902..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -81,7 +81,7 @@ SRCS(
memory_tracker.h
mon.h
mon_stats.h
- monotonic.cpp
+ monotonic.cpp
monotonic.h
worker_context.cpp
worker_context.h