aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorAlexander Gololobov <davenger@yandex-team.com>2022-02-10 16:47:38 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:38 +0300
commitfccc62e9bfdce9be2fe7e0f23479da3a5512211a (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors
parent39608cdb86363c75ce55b2b9a69841c3b71f22cf (diff)
downloadydb-fccc62e9bfdce9be2fe7e0f23479da3a5512211a.tar.gz
Restoring authorship annotation for Alexander Gololobov <davenger@yandex-team.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor.h14
-rw-r--r--library/cpp/actors/core/actorsystem.h6
-rw-r--r--library/cpp/actors/core/defs.h12
-rw-r--r--library/cpp/actors/core/event.h22
-rw-r--r--library/cpp/actors/core/event_pb.h100
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp128
-rw-r--r--library/cpp/actors/core/executelater.h8
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp26
-rw-r--r--library/cpp/actors/core/executor_pool_base.h4
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp10
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h4
-rw-r--r--library/cpp/actors/core/executor_pool_io.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.cpp42
-rw-r--r--library/cpp/actors/core/executor_thread.h6
-rw-r--r--library/cpp/actors/core/log.cpp76
-rw-r--r--library/cpp/actors/core/log.h4
-rw-r--r--library/cpp/actors/core/log_settings.cpp12
-rw-r--r--library/cpp/actors/core/log_settings.h6
-rw-r--r--library/cpp/actors/core/mailbox.cpp6
-rw-r--r--library/cpp/actors/core/mailbox.h2
-rw-r--r--library/cpp/actors/core/mon_stats.h36
-rw-r--r--library/cpp/actors/core/probes.cpp6
-rw-r--r--library/cpp/actors/core/probes.h10
-rw-r--r--library/cpp/actors/core/process_stats.cpp80
-rw-r--r--library/cpp/actors/core/process_stats.h14
-rw-r--r--library/cpp/actors/core/scheduler_actor.cpp2
-rw-r--r--library/cpp/actors/core/ya.make8
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp4
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h8
-rw-r--r--library/cpp/actors/interconnect/load.cpp12
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h6
-rw-r--r--library/cpp/actors/protos/unittests.proto6
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp34
-rw-r--r--library/cpp/actors/testlib/test_runtime.h8
38 files changed, 373 insertions, 373 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 7eabe6fbd1..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -218,7 +218,7 @@ namespace NActors {
TActorIdentity SelfActorId;
i64 ElapsedTicks;
ui64 HandledEvents;
-
+
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
friend class TDecorator;
@@ -235,10 +235,10 @@ namespace NActors {
INTERCONNECT_COMMON = 171,
SELF_PING_ACTOR = 207,
TEST_ACTOR_RUNTIME = 283,
- INTERCONNECT_HANDSHAKE = 284,
- INTERCONNECT_POLLER = 285,
- INTERCONNECT_SESSION_KILLER = 286,
- ACTOR_SYSTEM_SCHEDULER_ACTOR = 312,
+ INTERCONNECT_HANDSHAKE = 284,
+ INTERCONNECT_POLLER = 285,
+ INTERCONNECT_SESSION_KILLER = 286,
+ ACTOR_SYSTEM_SCHEDULER_ACTOR = 312,
ACTOR_FUTURE_CALLBACK = 337,
INTERCONNECT_MONACTOR = 362,
INTERCONNECT_LOAD_ACTOR = 376,
@@ -418,10 +418,10 @@ namespace NActors {
}
protected:
- //* Comment this function to find unmarked activities
+ //* Comment this function to find unmarked activities
static constexpr IActor::EActivityType ActorActivityType() {
return EActorActivity::OTHER;
- } //*/
+ } //*/
// static constexpr char ActorName[] = "UNNAMED";
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 242794ac6f..40499d7586 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -8,7 +8,7 @@
#include "event.h"
#include "log_settings.h"
#include "scheduler_cookie.h"
-#include "mon_stats.h"
+#include "mon_stats.h"
#include <library/cpp/threading/future/future.h>
#include <library/cpp/actors/util/ticket_lock.h>
@@ -58,7 +58,7 @@ namespace NActors {
, DestroyedActors(0)
{
}
-
+
virtual ~IExecutorPool() {
}
@@ -348,7 +348,7 @@ namespace NActors {
T* AppData() const {
return (T*)AppData0;
}
-
+
NLog::TSettings* LoggerSettings() const {
return LoggerSettings0.Get();
}
diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h
index 6c50ab677c..980b7d767b 100644
--- a/library/cpp/actors/core/defs.h
+++ b/library/cpp/actors/core/defs.h
@@ -6,12 +6,12 @@
#include <util/generic/hash.h>
#include <util/string/printf.h>
-// Enables collection of
-// event send/receive counts
-// activation time histograms
-// event processing time histograms
-#define ACTORSLIB_COLLECT_EXEC_STATS
-
+// Enables collection of
+// event send/receive counts
+// activation time histograms
+// event processing time histograms
+#define ACTORSLIB_COLLECT_EXEC_STATS
+
namespace NActors {
using TPoolId = ui8;
using TPoolsMask = ui64;
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 9d2b694cb2..6ff02aaf94 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -7,9 +7,9 @@
#include <library/cpp/actors/wilson/wilson_trace.h>
-#include <util/system/hp_timer.h>
+#include <util/system/hp_timer.h>
#include <util/generic/maybe.h>
-
+
namespace NActors {
class TChunkSerializer;
@@ -110,10 +110,10 @@ namespace NActors {
// filled if feeded by interconnect session
const TActorId InterconnectSession;
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
::NHPTimer::STime SendTime;
-#endif
-
+#endif
+
static const size_t ChannelBits = 12;
static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
@@ -174,9 +174,9 @@ namespace NActors {
, Sender(sender)
, Cookie(cookie)
, TraceId(std::move(traceId))
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
, SendTime(0)
-#endif
+#endif
, Event(ev)
, RewriteRecipient(Recipient)
, RewriteType(Type)
@@ -199,9 +199,9 @@ namespace NActors {
, Sender(sender)
, Cookie(cookie)
, TraceId(std::move(traceId))
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
, SendTime(0)
-#endif
+#endif
, Buffer(std::move(buffer))
, RewriteRecipient(Recipient)
, RewriteType(Type)
@@ -228,9 +228,9 @@ namespace NActors {
, OriginScopeId(originScopeId)
, TraceId(std::move(traceId))
, InterconnectSession(session)
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
, SendTime(0)
-#endif
+#endif
, Buffer(std::move(buffer))
, RewriteRecipient(Recipient)
, RewriteType(Type)
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d7ff4a3cbe..d7546b901a 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -127,27 +127,27 @@ namespace NActors {
static const size_t EventMaxByteSize = 67108000;
#endif
- template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
- class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
+ template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
+ class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
// a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
TVector<TRope> Payload;
public:
- using TRecHolder::Record;
-
- public:
+ using TRecHolder::Record;
+
+ public:
using ProtoRecordType = TRecord;
- TEventPBBase() = default;
+ TEventPBBase() = default;
- explicit TEventPBBase(const TRecord& rec)
+ explicit TEventPBBase(const TRecord& rec)
{
- Record = rec;
+ Record = rec;
}
- explicit TEventPBBase(TRecord&& rec)
+ explicit TEventPBBase(TRecord&& rec)
{
- Record = std::move(rec);
+ Record = std::move(rec);
}
TString ToStringHeader() const override {
@@ -231,7 +231,7 @@ namespace NActors {
}
static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) {
- THolder<TEventPBBase> ev(new TEv());
+ THolder<TEventPBBase> ev(new TEv());
if (!input->GetSize()) {
Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
} else {
@@ -273,7 +273,7 @@ namespace NActors {
}
ev->CachedByteSize = input->GetSize();
return ev.Release();
- }
+ }
size_t GetCachedByteSize() const {
if (CachedByteSize == 0) {
@@ -369,43 +369,43 @@ namespace NActors {
}
};
- // Protobuf record not using arena
- template <typename TRecord>
- struct TRecordHolder {
- TRecord Record;
- };
-
- // Protobuf arena and a record allocated on it
- template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize>
- struct TArenaRecordHolder {
- google::protobuf::Arena PbArena;
- TRecord& Record;
-
- static const google::protobuf::ArenaOptions GetArenaOptions() {
- google::protobuf::ArenaOptions opts;
- opts.initial_block_size = InitialBlockSize;
- opts.max_block_size = MaxBlockSize;
- return opts;
- }
-
- TArenaRecordHolder()
- : PbArena(GetArenaOptions())
- , Record(*google::protobuf::Arena::CreateMessage<TRecord>(&PbArena))
- {}
- };
-
+ // Protobuf record not using arena
+ template <typename TRecord>
+ struct TRecordHolder {
+ TRecord Record;
+ };
+
+ // Protobuf arena and a record allocated on it
+ template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize>
+ struct TArenaRecordHolder {
+ google::protobuf::Arena PbArena;
+ TRecord& Record;
+
+ static const google::protobuf::ArenaOptions GetArenaOptions() {
+ google::protobuf::ArenaOptions opts;
+ opts.initial_block_size = InitialBlockSize;
+ opts.max_block_size = MaxBlockSize;
+ return opts;
+ }
+
+ TArenaRecordHolder()
+ : PbArena(GetArenaOptions())
+ , Record(*google::protobuf::Arena::CreateMessage<TRecord>(&PbArena))
+ {}
+ };
+
+ template <typename TEv, typename TRecord, ui32 TEventType>
+ class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > {
+ typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase;
+ // NOTE: No extra fields allowed: TEventPB must be a "template typedef"
+ public:
+ using TPbBase::TPbBase;
+ };
+
+ template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024>
+ using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >;
+
template <typename TEv, typename TRecord, ui32 TEventType>
- class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > {
- typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase;
- // NOTE: No extra fields allowed: TEventPB must be a "template typedef"
- public:
- using TPbBase::TPbBase;
- };
-
- template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024>
- using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >;
-
- template <typename TEv, typename TRecord, ui32 TEventType>
class TEventShortDebugPB: public TEventPB<TEv, TRecord, TEventType> {
public:
using TBase = TEventPB<TEv, TRecord, TEventType>;
@@ -455,7 +455,7 @@ namespace NActors {
base.Swap(&copy);
PreSerializedData.clear();
}
- return TBase::Record;
+ return TBase::Record;
}
const TRecord& GetRecord() const {
@@ -463,7 +463,7 @@ namespace NActors {
}
TRecord* MutableRecord() {
- GetRecord(); // Make sure PreSerializedData is parsed
+ GetRecord(); // Make sure PreSerializedData is parsed
return &(TBase::Record);
}
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index 4dcc958bb1..eab007bc15 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -8,7 +8,7 @@ using namespace NActors;
enum {
EvMessageWithPayload = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvArenaMessage,
+ EvArenaMessage,
EvArenaMessageBig,
EvMessageWithPayloadPreSerialized
};
@@ -38,81 +38,81 @@ TString MakeString(size_t len) {
Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
- template <class TEventFrom, class TEventTo>
- void TestSerializeDeserialize(size_t size1, size_t size2) {
- static_assert(TEventFrom::EventType == TEventTo::EventType, "Must be same event type");
+ template <class TEventFrom, class TEventTo>
+ void TestSerializeDeserialize(size_t size1, size_t size2) {
+ static_assert(TEventFrom::EventType == TEventTo::EventType, "Must be same event type");
- TEventFrom msg;
- msg.Record.SetMeta("hello, world!");
- msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size1))));
- msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size2))));
- msg.Record.AddSomeData(MakeString((size1 + size2) % 50 + 11));
+ TEventFrom msg;
+ msg.Record.SetMeta("hello, world!");
+ msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size1))));
+ msg.Record.AddPayloadId(msg.AddPayload(MakeStringRope(MakeString(size2))));
+ msg.Record.AddSomeData(MakeString((size1 + size2) % 50 + 11));
- auto serializer = MakeHolder<TAllocChunkSerializer>();
+ auto serializer = MakeHolder<TAllocChunkSerializer>();
msg.SerializeToArcadiaStream(serializer.Get());
- auto buffers = serializer->Release(msg.IsExtendedFormat());
- UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());
- TString ser = buffers->GetString();
-
- TString chunkerRes;
- TCoroutineChunkSerializer chunker;
- chunker.SetSerializingEvent(&msg);
- while (!chunker.IsComplete()) {
- char buffer[4096];
+ auto buffers = serializer->Release(msg.IsExtendedFormat());
+ UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());
+ TString ser = buffers->GetString();
+
+ TString chunkerRes;
+ TCoroutineChunkSerializer chunker;
+ chunker.SetSerializingEvent(&msg);
+ while (!chunker.IsComplete()) {
+ char buffer[4096];
auto range = chunker.FeedBuf(buffer, sizeof(buffer));
for (auto p = range.first; p != range.second; ++p) {
chunkerRes += TString(p->first, p->second);
}
}
- UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser);
-
+ UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser);
+
THolder<IEventBase> ev2 = THolder(TEventTo::Load(buffers));
- TEventTo& msg2 = static_cast<TEventTo&>(*ev2);
- UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta());
- UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0)));
- UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(1)), msg.GetPayload(msg.Record.GetPayloadId(1)));
+ TEventTo& msg2 = static_cast<TEventTo&>(*ev2);
+ UNIT_ASSERT_VALUES_EQUAL(msg2.Record.GetMeta(), msg.Record.GetMeta());
+ UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(0)), msg.GetPayload(msg.Record.GetPayloadId(0)));
+ UNIT_ASSERT_EQUAL(msg2.GetPayload(msg2.Record.GetPayloadId(1)), msg.GetPayload(msg.Record.GetPayloadId(1)));
+ }
+
+ template <class TEvent>
+ void TestAllSizes(size_t step1 = 100, size_t step2 = 111) {
+ for (size_t size1 = 0; size1 < 10000; size1 += step1) {
+ for (size_t size2 = 0; size2 < 10000; size2 += step2) {
+ TestSerializeDeserialize<TEvent, TEvent>(size1, size2);
+ }
+ }
}
-
- template <class TEvent>
- void TestAllSizes(size_t step1 = 100, size_t step2 = 111) {
- for (size_t size1 = 0; size1 < 10000; size1 += step1) {
- for (size_t size2 = 0; size2 < 10000; size2 += step2) {
- TestSerializeDeserialize<TEvent, TEvent>(size1, size2);
- }
- }
- }
-
+
#if (!defined(_tsan_enabled_))
- Y_UNIT_TEST(SerializeDeserialize) {
- TestAllSizes<TEvMessageWithPayload>();
- }
+ Y_UNIT_TEST(SerializeDeserialize) {
+ TestAllSizes<TEvMessageWithPayload>();
+ }
#endif
-
-
- struct TEvArenaMessage : TEventPBWithArena<TEvArenaMessage, TMessageWithPayload, EvArenaMessage> {
- };
-
- Y_UNIT_TEST(SerializeDeserializeArena) {
- TestAllSizes<TEvArenaMessage>(500, 111);
- }
-
-
- struct TEvArenaMessageBig : TEventPBWithArena<TEvArenaMessageBig, TMessageWithPayload, EvArenaMessageBig, 4000, 32000> {
- };
-
- Y_UNIT_TEST(SerializeDeserializeArenaBig) {
- TestAllSizes<TEvArenaMessageBig>(111, 500);
- }
-
-
- // Compatible with TEvArenaMessage but doesn't use arenas
- struct TEvArenaMessageWithoutArena : TEventPB<TEvArenaMessageWithoutArena, TMessageWithPayload, EvArenaMessage> {
- };
-
- Y_UNIT_TEST(Compatibility) {
- TestSerializeDeserialize<TEvArenaMessage, TEvArenaMessageWithoutArena>(200, 14010);
- TestSerializeDeserialize<TEvArenaMessageWithoutArena, TEvArenaMessage>(2000, 4010);
- }
+
+
+ struct TEvArenaMessage : TEventPBWithArena<TEvArenaMessage, TMessageWithPayload, EvArenaMessage> {
+ };
+
+ Y_UNIT_TEST(SerializeDeserializeArena) {
+ TestAllSizes<TEvArenaMessage>(500, 111);
+ }
+
+
+ struct TEvArenaMessageBig : TEventPBWithArena<TEvArenaMessageBig, TMessageWithPayload, EvArenaMessageBig, 4000, 32000> {
+ };
+
+ Y_UNIT_TEST(SerializeDeserializeArenaBig) {
+ TestAllSizes<TEvArenaMessageBig>(111, 500);
+ }
+
+
+ // Compatible with TEvArenaMessage but doesn't use arenas
+ struct TEvArenaMessageWithoutArena : TEventPB<TEvArenaMessageWithoutArena, TMessageWithPayload, EvArenaMessage> {
+ };
+
+ Y_UNIT_TEST(Compatibility) {
+ TestSerializeDeserialize<TEvArenaMessage, TEvArenaMessageWithoutArena>(200, 14010);
+ TestSerializeDeserialize<TEvArenaMessageWithoutArena, TEvArenaMessage>(2000, 4010);
+ }
Y_UNIT_TEST(PreSerializedCompatibility) {
// ensure TEventPreSerializedPB and TEventPB are interchangable with no compatibility issues
diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h
index a2e380e466..e7a13c1005 100644
--- a/library/cpp/actors/core/executelater.h
+++ b/library/cpp/actors/core/executelater.h
@@ -8,10 +8,10 @@ namespace NActors {
template <typename TCallback>
class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> {
public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::ACTORLIB_COMMON;
- }
-
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::ACTORLIB_COMMON;
+ }
+
TExecuteLater(
TCallback&& callback,
IActor::EActivityType activityType,
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index e3852eaf8d..c3b9999168 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -16,9 +16,9 @@ namespace NActors {
: IExecutorPool(poolId)
, ActorSystem(nullptr)
, MailboxTable(new TMailboxTable)
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- , Stats(maxActivityType)
-#endif
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ , Stats(maxActivityType)
+#endif
{}
TExecutorPoolBaseMailboxed::~TExecutorPoolBaseMailboxed() {
@@ -47,9 +47,9 @@ namespace NActors {
bool TExecutorPoolBaseMailboxed::Send(TAutoPtr<IEventHandle>& ev) {
Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId);
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast());
-#endif
+#endif
return MailboxTable->SendTo(ev, this);
}
@@ -59,21 +59,21 @@ namespace NActors {
TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
if (at >= Stats.MaxActivityType())
at = 0;
AtomicIncrement(Stats.ActorsAliveByActivity[at]);
-#endif
+#endif
AtomicIncrement(ActorRegistrations);
-
+
// first step - find good enough mailbox
ui32 hint = 0;
TMailboxHeader* mailbox = nullptr;
if (revolvingWriteCounter == 0)
revolvingWriteCounter = AtomicIncrement(RegisterRevolvingCounter);
-
+
{
ui32 hintBackoff = 0;
@@ -122,7 +122,7 @@ namespace NActors {
default:
Y_FAIL();
}
-
+
NHPTimer::STime elapsed = GetCycleCountFast() - hpstart;
if (elapsed > 1000000) {
LWPROBE(SlowRegisterNew, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0);
@@ -133,14 +133,14 @@ namespace NActors {
TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
if (at >= Stats.MaxActivityType())
at = 0;
AtomicIncrement(Stats.ActorsAliveByActivity[at]);
-#endif
+#endif
AtomicIncrement(ActorRegistrations);
-
+
const ui64 localActorId = AllocateID();
mailbox->AttachActor(localActorId, actor);
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index 49129e8a8d..c84ce1af77 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -12,11 +12,11 @@ namespace NActors {
protected:
TActorSystem* ActorSystem;
THolder<TMailboxTable> MailboxTable;
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
// Need to have per pool object to collect stats like actor registrations (because
// registrations might be done in threads from other pools)
TExecutorThreadStats Stats;
-#endif
+#endif
TAtomic RegisterRevolvingCounter = 0;
ui64 AllocateID();
public:
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index fdd07ef84f..4dce16939a 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -21,8 +21,8 @@ namespace NActors {
TAffinity* affinity,
TDuration timePerMailbox,
ui32 eventsPerMailbox,
- int realtimePriority,
- ui32 maxActivityType)
+ int realtimePriority,
+ ui32 maxActivityType)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
, SpinThreshold(spinThreshold)
, SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
@@ -195,7 +195,7 @@ namespace NActors {
return activation;
}
SpinLockPause();
- }
+ }
// stopping, die!
return 0;
@@ -245,8 +245,8 @@ namespace NActors {
for (size_t i = 0; i < PoolThreads; ++i) {
Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]);
}
- }
-
+ }
+
void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
TAffinityGuard affinityGuard(Affinity());
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 2a663ab3ca..023190f7fe 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -77,8 +77,8 @@ namespace NActors {
TAffinity* affinity = nullptr,
TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX,
- int realtimePriority = 0,
- ui32 maxActivityType = 1);
+ int realtimePriority = 0,
+ ui32 maxActivityType = 1);
explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg);
~TBasicExecutorPool();
diff --git a/library/cpp/actors/core/executor_pool_io.h b/library/cpp/actors/core/executor_pool_io.h
index b4f1472731..e576d642a1 100644
--- a/library/cpp/actors/core/executor_pool_io.h
+++ b/library/cpp/actors/core/executor_pool_io.h
@@ -26,7 +26,7 @@ namespace NActors {
public:
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr,
- ui32 maxActivityType = 1);
+ ui32 maxActivityType = 1);
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg);
~TIOExecutorPool();
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index c8c2b3af54..dac6245635 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -741,7 +741,7 @@ namespace NActors {
#endif
}
}
- }
+ }
void WakeFastWorker() {
#ifdef _linux_
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 437f1d04b9..446b651efd 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -21,9 +21,9 @@
#include <util/system/type_name.h>
#include <util/system/datetime.h>
-
-LWTRACE_USING(ACTORLIB_PROVIDER)
-
+
+LWTRACE_USING(ACTORLIB_PROVIDER)
+
namespace NActors {
constexpr TDuration TExecutorThread::DEFAULT_TIME_PER_MAILBOX;
@@ -98,10 +98,10 @@ namespace NActors {
}
}
- inline TString ActorTypeName(const IActor* actor, ui32 activityType) {
- return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)");
- }
-
+ inline TString ActorTypeName(const IActor* actor, ui32 activityType) {
+ return actor ? SafeTypeName(actor) : ("activityType_" + ToString(activityType) + " (destroyed)");
+ }
+
inline void LwTraceSlowDelivery(IEventHandle* ev, const IActor* actor, ui32 poolId, const TActorId& currentRecipient,
double delivMs, double sinceActivationMs, ui32 eventsExecutedBefore) {
const auto baseEv = (ev && ev->HasEvent()) ? ev->GetBase() : nullptr;
@@ -124,13 +124,13 @@ namespace NActors {
eventMs,
baseEv ? SafeTypeName(baseEv) : ToString(evTypeForTracing),
currentRecipient.ToString(),
- ActorTypeName(actor, activityType));
+ ActorTypeName(actor, activityType));
}
template <typename TMailbox>
void TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) {
Y_VERIFY_DEBUG(DyingActors.empty());
-
+
bool reclaimAsFree = false;
NHPTimer::STime hpstart = GetCycleCountFast();
@@ -167,9 +167,9 @@ namespace NActors {
double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0;
LwTraceSlowDelivery(ev.Get(), actor, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed);
}
-
+
ui32 evTypeForTracing = ev->Type;
-
+
ui32 activityType = actor->GetActivityType();
if (activityType != prevActivityType) {
prevActivityType = activityType;
@@ -184,10 +184,10 @@ namespace NActors {
DropUnregistered();
actor = nullptr;
}
-
+
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
reclaimAsFree = true;
-
+
hpnow = GetCycleCountFast();
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
@@ -197,9 +197,9 @@ namespace NActors {
// The actor might have been destroyed
if (actor)
actor->AddElapsedTicks(elapsed);
-
+
CurrentRecipient = TActorId();
- } else {
+ } else {
TAutoPtr<IEventHandle> nonDelivered = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
if (nonDelivered.Get()) {
ActorSystem->Send(nonDelivered);
@@ -207,7 +207,7 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
- }
+ }
hpprev = hpnow;
@@ -241,8 +241,8 @@ namespace NActors {
recipient.ToString(),
SafeTypeName(actor));
break;
- }
-
+ }
+
if (executed + 1 == Ctx.EventsPerMailbox) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Ctx.IncrementMailboxPushedOutByEventCount();
@@ -255,8 +255,8 @@ namespace NActors {
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actor));
- break;
- }
+ break;
+ }
} else {
if (executed == 0)
Ctx.IncrementEmptyMailboxActivation();
@@ -271,7 +271,7 @@ namespace NActors {
SafeTypeName(actor));
break; // empty queue, leave
}
- }
+ }
NProfiling::TMemoryTagScope::Reset(0);
TlsActivationContext = nullptr;
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 6d6e527f9e..9d3c573f0d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -54,15 +54,15 @@ namespace NActors {
#ifdef USE_ACTOR_CALLSTACK
ev->Callstack = TCallstack::GetTlsCallstack();
ev->Callstack.Trace();
-#endif
+#endif
Ctx.IncrementSentEvents();
return ActorSystem->Send(ev);
}
-
+
void GetCurrentStats(TExecutorThreadStats& statsCopy) const {
Ctx.GetCurrentStats(statsCopy);
}
-
+
TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const { return Ctx.WorkerId; }
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 3edc42012c..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -164,8 +164,8 @@ namespace NActors {
std::shared_ptr<NMonitoring::TMetricRegistry> Metrics;
};
- TAtomic TLoggerActor::IsOverflow = 0;
-
+ TAtomic TLoggerActor::IsOverflow = 0;
+
TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
TAutoPtr<TLogBackend> logBackend,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters)
@@ -224,10 +224,10 @@ namespace NActors {
}
void TLoggerActor::Throttle(const NLog::TSettings& settings) {
- if (AtomicGet(IsOverflow))
+ if (AtomicGet(IsOverflow))
Sleep(settings.ThrottleDelay);
- }
-
+ }
+
void TLoggerActor::LogIgnoredCount(TInstant now) {
TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount);
if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) {
@@ -250,7 +250,7 @@ namespace NActors {
Metrics->IncActorMsgs();
const auto prio = ev.Level.ToPrio();
-
+
switch (prio) {
case ::NActors::NLog::EPrio::Alert:
Metrics->IncAlertMsgs();
@@ -267,28 +267,28 @@ namespace NActors {
void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds();
WriteMessageStat(*ev->Get());
- if (Settings->AllowDrop) {
- // Disable throttling if it was enabled previously
- if (AtomicGet(IsOverflow))
- AtomicSet(IsOverflow, 0);
-
- // Check if some records have to be dropped
+ if (Settings->AllowDrop) {
+ // Disable throttling if it was enabled previously
+ if (AtomicGet(IsOverflow))
+ AtomicSet(IsOverflow, 0);
+
+ // Check if some records have to be dropped
if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) {
Metrics->IncIgnoredMsgs();
- if (IgnoredCount == 0) {
- ctx.Send(ctx.SelfID, new TLogIgnored());
- }
- ++IgnoredCount;
- PassedCount = 0;
- return;
+ if (IgnoredCount == 0) {
+ ctx.Send(ctx.SelfID, new TLogIgnored());
+ }
+ ++IgnoredCount;
+ PassedCount = 0;
+ return;
}
- PassedCount++;
- } else {
- // Enable of disable throttling depending on the load
- if (delayMillisec > (i64)Settings->TimeThresholdMs && !AtomicGet(IsOverflow))
- AtomicSet(IsOverflow, 1);
- else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow))
- AtomicSet(IsOverflow, 0);
+ PassedCount++;
+ } else {
+ // Enable of disable throttling depending on the load
+ if (delayMillisec > (i64)Settings->TimeThresholdMs && !AtomicGet(IsOverflow))
+ AtomicSet(IsOverflow, 1);
+ else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow))
+ AtomicSet(IsOverflow, 0);
}
const auto prio = ev->Get()->Level.ToPrio();
@@ -376,8 +376,8 @@ namespace NActors {
bool hasPriority = false;
bool hasSamplingPriority = false;
bool hasSamplingRate = false;
- bool hasAllowDrop = false;
- int allowDrop = 0;
+ bool hasAllowDrop = false;
+ int allowDrop = 0;
if (params.Has("c")) {
if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) {
hasComponent = true;
@@ -402,11 +402,11 @@ namespace NActors {
}
}
}
- if (params.Has("allowdrop")) {
- if (TryFromString(params.Get("allowdrop"), allowDrop)) {
- hasAllowDrop = true;
- }
- }
+ if (params.Has("allowdrop")) {
+ if (TryFromString(params.Get("allowdrop"), allowDrop)) {
+ hasAllowDrop = true;
+ }
+ }
TStringStream str;
if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) {
@@ -485,9 +485,9 @@ namespace NActors {
if (hasComponent && hasSamplingRate) {
Settings->SetSamplingRate(samplingRate, component, explanation);
}
- if (hasAllowDrop) {
- Settings->SetAllowDrop(allowDrop);
- }
+ if (hasAllowDrop) {
+ Settings->SetAllowDrop(allowDrop);
+ }
HTML(str) {
if (!explanation.empty()) {
@@ -559,10 +559,10 @@ namespace NActors {
str << "Drop log entries in case of overflow: "
<< (Settings->AllowDrop ? "Enabled" : "Disabled");
}
- str << "<form method=\"GET\">" << Endl;
+ str << "<form method=\"GET\">" << Endl;
str << "<input type=\"hidden\" name=\"allowdrop\" value=\"" << (Settings->AllowDrop ? "0" : "1") << "\"/>" << Endl;
str << "<input class=\"btn btn-primary\" type=\"submit\" value=\"" << (Settings->AllowDrop ? "Disable" : "Enable") << "\"/>" << Endl;
- str << "</form>" << Endl;
+ str << "</form>" << Endl;
}
}
Metrics->GetOutputHtml(str);
@@ -588,7 +588,7 @@ namespace NActors {
logRecord << time;
}
logRecord
- << Settings->MessagePrefix
+ << Settings->MessagePrefix
<< " :" << Settings->ComponentName(component)
<< " " << PriorityToString(priority)
<< ": " << formatted;
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 4219194faa..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -233,13 +233,13 @@ namespace NActors {
void Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...);
static void Throttle(const NLog::TSettings& settings);
-
+
private:
TIntrusivePtr<NLog::TSettings> Settings;
std::shared_ptr<TLogBackend> LogBackend;
ui64 IgnoredCount = 0;
ui64 PassedCount = 0;
- static TAtomic IsOverflow;
+ static TAtomic IsOverflow;
TDuration WakeupInterval{TDuration::Seconds(5)};
std::unique_ptr<ILoggerMetrics> Metrics;
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index e22760d149..f52f2fc5d2 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -11,7 +11,7 @@ namespace NActors {
: LoggerActorId(loggerActorId)
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
- , AllowDrop(true)
+ , AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
, MaxVal(0)
@@ -33,7 +33,7 @@ namespace NActors {
: LoggerActorId(loggerActorId)
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
- , AllowDrop(true)
+ , AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
, MaxVal(0)
@@ -201,10 +201,10 @@ namespace NActors {
return (MinVal <= component) && (component <= MaxVal) && !ComponentNames[component].empty();
}
- void TSettings::SetAllowDrop(bool val) {
- AllowDrop = val;
- }
-
+ void TSettings::SetAllowDrop(bool val) {
+ AllowDrop = val;
+ }
+
void TSettings::SetThrottleDelay(TDuration value) {
ThrottleDelay = value;
}
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index acab6bb93e..7fe4504edd 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -72,7 +72,7 @@ namespace NActors {
TActorId LoggerActorId;
EComponent LoggerComponent;
ui64 TimeThresholdMs;
- bool AllowDrop;
+ bool AllowDrop;
TDuration ThrottleDelay;
TArrayHolder<TAtomic> ComponentInfo;
TVector<TString> ComponentNames;
@@ -92,7 +92,7 @@ namespace NActors {
ELogFormat Format;
TString ShortHostName;
TString ClusterName;
- TString MessagePrefix;
+ TString MessagePrefix;
// The best way to provide minVal, maxVal and func is to have
// protobuf enumeration of components. In this case protoc
@@ -161,7 +161,7 @@ namespace NActors {
static int PowerOf2Mask(int val);
static bool IsValidPriority(EPriority priority);
bool IsValidComponent(EComponent component);
- void SetAllowDrop(bool val);
+ void SetAllowDrop(bool val);
void SetThrottleDelay(TDuration value);
void SetUseLocalTimestamps(bool value);
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index 87337bfa9e..d84b4f9e46 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -180,7 +180,7 @@ namespace NActors {
TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x);
#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
@@ -200,11 +200,11 @@ namespace NActors {
"We expect that one line can store more simple mailboxes than revolving mailboxes");
if (lineHint > TRevolvingMailbox::MaxMailboxesInLine())
return false;
-
+
TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x);
#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->QueueWriter.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 1879a8aea6..0bd9c4d314 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -370,7 +370,7 @@ namespace NActors {
TRevolvingMailbox();
~TRevolvingMailbox();
-
+
IEventHandle* Pop() {
return QueueReader.Pop();
}
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 2415537e71..d55552af0c 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -1,16 +1,16 @@
-#pragma once
-
-#include "defs.h"
-#include "actor.h"
+#pragma once
+
+#include "defs.h"
+#include "actor.h"
#include <library/cpp/monlib/metrics/histogram_snapshot.h>
-#include <util/system/hp_timer.h>
-
-namespace NActors {
+#include <util/system/hp_timer.h>
+
+namespace NActors {
struct TLogHistogram : public NMonitoring::IHistogramSnapshot {
TLogHistogram() {
memset(Buckets, 0, sizeof(Buckets));
}
-
+
inline void Add(ui64 val, ui64 inc = 1) {
size_t ind = 0;
#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
@@ -27,15 +27,15 @@ namespace NActors {
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc);
}
-
+
void Aggregate(const TLogHistogram& other) {
const ui64 inc = RelaxedLoad(&other.TotalSamples);
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) {
Buckets[i] += RelaxedLoad(&other.Buckets[i]);
}
- }
-
+ }
+
// IHistogramSnapshot
ui32 Count() const override {
return Y_ARRAY_SIZE(Buckets);
@@ -57,7 +57,7 @@ namespace NActors {
ui64 TotalSamples = 0;
ui64 Buckets[65];
};
-
+
struct TExecutorPoolStats {
ui64 MaxUtilizationTime = 0;
};
@@ -86,7 +86,7 @@ namespace NActors {
ui64 MailboxPushedOutBySoftPreemption = 0;
ui64 MailboxPushedOutByTime = 0;
ui64 MailboxPushedOutByEventCount = 0;
-
+
TExecutorThreadStats(size_t activityVecSize = 1) // must be not empty as 0 used as default
: ElapsedTicksByActivity(activityVecSize)
, ReceivedEventsByActivity(activityVecSize)
@@ -103,7 +103,7 @@ namespace NActors {
for (size_t at = 0; at < otherSize; ++at)
self[at] += RelaxedLoad(&other[at]);
}
-
+
void Aggregate(const TExecutorThreadStats& other) {
SentEvents += RelaxedLoad(&other.SentEvents);
ReceivedEvents += RelaxedLoad(&other.ReceivedEvents);
@@ -115,9 +115,9 @@ namespace NActors {
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
- MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
- MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
-
+ MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
+ MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
+
ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram);
@@ -143,5 +143,5 @@ namespace NActors {
return ActorsAliveByActivity.size();
}
};
-
+
}
diff --git a/library/cpp/actors/core/probes.cpp b/library/cpp/actors/core/probes.cpp
index ae475bb59f..7ace83e102 100644
--- a/library/cpp/actors/core/probes.cpp
+++ b/library/cpp/actors/core/probes.cpp
@@ -1,10 +1,10 @@
-#include "probes.h"
-
+#include "probes.h"
+
#include "actorsystem.h"
#include <util/string/builder.h>
-LWTRACE_DEFINE_PROVIDER(ACTORLIB_PROVIDER);
+LWTRACE_DEFINE_PROVIDER(ACTORLIB_PROVIDER);
namespace NActors {
TVector<NLWTrace::TDashboard> LWTraceDashboards(TActorSystemSetup* setup) {
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 536200b977..4912d6dd26 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/lwtrace/all.h>
#include <util/generic/vector.h>
-
+
#define LWACTORID(x) (x).RawX1(), (x).RawX2(), (x).NodeId(), (x).PoolID()
#define LWTYPE_ACTORID ui64, ui64, ui32, ui32
#define LWNAME_ACTORID(n) n "Raw1", n "Raw2", n "NodeId", n "PoolId"
@@ -167,8 +167,8 @@
TYPES(ui32, ui64, TString, TString, ui32), \
NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \
/**/
-
-LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
+
+LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
namespace NActors {
struct TActorSystemSetup;
diff --git a/library/cpp/actors/core/process_stats.cpp b/library/cpp/actors/core/process_stats.cpp
index f3a7341980..0e1dbd0031 100644
--- a/library/cpp/actors/core/process_stats.cpp
+++ b/library/cpp/actors/core/process_stats.cpp
@@ -1,25 +1,25 @@
-#include "actorsystem.h"
-#include "actor_bootstrapped.h"
-#include "hfunc.h"
+#include "actorsystem.h"
+#include "actor_bootstrapped.h"
+#include "hfunc.h"
#include "process_stats.h"
-
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
-
+
#include <util/datetime/uptime.h>
-#include <util/system/defaults.h>
-#include <util/stream/file.h>
-#include <util/string/vector.h>
-#include <util/string/split.h>
-
-#ifndef _win_
+#include <util/system/defaults.h>
+#include <util/stream/file.h>
+#include <util/string/vector.h>
+#include <util/string/split.h>
+
+#ifndef _win_
#include <sys/user.h>
#include <sys/sysctl.h>
-#endif
-
-namespace NActors {
-#ifdef _linux_
-
+#endif
+
+namespace NActors {
+#ifdef _linux_
+
namespace {
template <typename TVal>
static bool ExtractVal(const TString& str, const TString& name, TVal& res) {
@@ -32,16 +32,16 @@ namespace NActors {
res = atol(str.data() + pos);
return true;
}
-
+
float TicksPerMillisec() {
-#ifdef _SC_CLK_TCK
+#ifdef _SC_CLK_TCK
return sysconf(_SC_CLK_TCK) / 1000.0;
-#else
+#else
return 1.f;
-#endif
- }
+#endif
+ }
}
-
+
bool TProcStat::Fill(pid_t pid) {
try {
TString strPid(ToString(pid));
@@ -57,9 +57,9 @@ namespace NActors {
}
// Convert from kB to bytes
Rss *= 1024;
-
+
float tickPerMillisec = TicksPerMillisec();
-
+
TFileInput procStat("/proc/" + strPid + "/stat");
procStat.ReadLine(str);
if (!str.empty()) {
@@ -114,28 +114,28 @@ namespace NActors {
} catch (...) {
return false;
- }
+ }
return true;
- }
-
+ }
+
long TProcStat::ObtainPageSize() {
long sz = sysconf(_SC_PAGESIZE);
return sz;
}
-#else
-
+#else
+
bool TProcStat::Fill(pid_t pid) {
Y_UNUSED(pid);
return false;
}
-
+
long TProcStat::ObtainPageSize() {
return 0;
}
-#endif
-
+#endif
+
namespace {
// Periodically collects process stats and exposes them as mon counters
template <typename TDerived>
@@ -144,7 +144,7 @@ namespace {
static constexpr IActor::EActivityType ActorActivityType() {
return IActor::ACTORLIB_STATS;
}
-
+
TProcStatCollectingActor(TDuration interval)
: Interval(interval)
{
@@ -154,19 +154,19 @@ namespace {
ctx.Schedule(Interval, new TEvents::TEvWakeup());
Self()->Become(&TDerived::StateWork);
}
-
+
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
CFunc(TEvents::TSystem::Wakeup, Wakeup);
}
}
-
+
private:
void Wakeup(const TActorContext& ctx) {
Self()->UpdateCounters(ProcStat);
ctx.Schedule(Interval, new TEvents::TEvWakeup());
- }
-
+ }
+
TDerived* Self() {
ProcStat.Fill(getpid());
return static_cast<TDerived*>(this);
@@ -176,7 +176,7 @@ namespace {
const TDuration Interval;
TProcStat ProcStat;
};
-
+
// Periodically collects process stats and exposes them as mon counters
class TDynamicCounterCollector: public TProcStatCollectingActor<TDynamicCounterCollector> {
using TBase = TProcStatCollectingActor<TDynamicCounterCollector>;
@@ -295,8 +295,8 @@ namespace {
IActor* CreateProcStatCollector(ui32 intervalSec, NMonitoring::TDynamicCounterPtr counters) {
return new TDynamicCounterCollector(intervalSec, counters);
- }
-
+ }
+
IActor* CreateProcStatCollector(TDuration interval, NMonitoring::TMetricRegistry& registry) {
return new TRegistryCollector(interval, registry);
}
diff --git a/library/cpp/actors/core/process_stats.h b/library/cpp/actors/core/process_stats.h
index 454c04e6f3..66346d0b5a 100644
--- a/library/cpp/actors/core/process_stats.h
+++ b/library/cpp/actors/core/process_stats.h
@@ -1,15 +1,15 @@
-#pragma once
-
-#include "defs.h"
-#include "actor.h"
-
+#pragma once
+
+#include "defs.h"
+#include "actor.h"
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
-
+
namespace NMonitoring {
class TMetricRegistry;
}
-namespace NActors {
+namespace NActors {
struct TProcStat {
ui64 Rss;
ui64 VolCtxSwtch;
diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp
index 60d972d9aa..febc5e40dd 100644
--- a/library/cpp/actors/core/scheduler_actor.cpp
+++ b/library/cpp/actors/core/scheduler_actor.cpp
@@ -61,7 +61,7 @@ namespace NActors {
public:
static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR;
+ return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR;
}
TSchedulerActor(const TSchedulerConfig& cfg)
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 5a3a6c7a35..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -80,15 +80,15 @@ SRCS(
memory_tracker.cpp
memory_tracker.h
mon.h
- mon_stats.h
+ mon_stats.h
monotonic.cpp
monotonic.h
worker_context.cpp
worker_context.h
probes.cpp
- probes.h
- process_stats.cpp
- process_stats.h
+ probes.h
+ process_stats.cpp
+ process_stats.h
scheduler_actor.cpp
scheduler_actor.h
scheduler_basic.cpp
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index 2fa2ce3a45..f9bfaf8dc0 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -72,8 +72,8 @@ private:
public:
static constexpr auto ActorActivityType() {
return SELF_PING_ACTOR;
- }
-
+ }
+
TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
: SendInterval(sendInterval)
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index 9df99d1949..459635fa24 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -27,9 +27,9 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
TDuration::MilliSeconds(100), // sendInterval (unused in test)
counter, counter2);
- UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
-
+
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index ea09913bdd..9ede998d8e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -98,10 +98,10 @@ namespace NActors {
TInstant Deadline;
public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::INTERCONNECT_HANDSHAKE;
- }
-
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::INTERCONNECT_HANDSHAKE;
+ }
+
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
: TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 846388edfc..7fc00dbcc5 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -518,10 +518,10 @@ namespace NActors {
TInterconnectProxyCommon::TPtr Common;
public:
- static constexpr EActivityType ActorActivityType() {
- return INTERCONNECT_SESSION_KILLER;
- }
-
+ static constexpr EActivityType ActorActivityType() {
+ return INTERCONNECT_SESSION_KILLER;
+ }
+
TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common)
: Common(common)
{
diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp
index b40a01b3e8..2a8443da71 100644
--- a/library/cpp/actors/interconnect/load.cpp
+++ b/library/cpp/actors/interconnect/load.cpp
@@ -100,10 +100,10 @@ namespace NInterconnect {
}
public:
- static constexpr IActor::EActivityType ActorActivityType() {
+ static constexpr IActor::EActivityType ActorActivityType() {
return IActor::INTERCONNECT_LOAD_RESPONDER;
- }
-
+ }
+
TLoadResponderMasterActor()
{}
@@ -150,10 +150,10 @@ namespace NInterconnect {
std::shared_ptr<std::atomic_uint64_t> Traffic;
public:
- static constexpr IActor::EActivityType ActorActivityType() {
+ static constexpr IActor::EActivityType ActorActivityType() {
return IActor::INTERCONNECT_LOAD_ACTOR;
- }
-
+ }
+
TLoadActor(const TLoadParams& params)
: Params(params)
{}
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 9bf3bd1d8d..e75cbcaef4 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -246,10 +246,10 @@ namespace NActors {
std::shared_ptr<TPollerThread> PollerThread;
public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::INTERCONNECT_POLLER;
- }
-
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::INTERCONNECT_POLLER;
+ }
+
void Bootstrap() {
PollerThread = std::make_shared<TPollerThread>(TlsActivationContext->ExecutorThread.ActorSystem);
Become(&TPollerActor::StateFunc);
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 95666d3f7a..ff30b1445e 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -21,10 +21,10 @@ public:
TChannelsConfig channelsSettings = TChannelsConfig(),
ui32 numDynamicNodes = 0, ui32 numThreads = 1) {
TActorSystemSetup setup;
- setup.NodeId = nodeId;
- setup.ExecutorsCount = 1;
+ setup.NodeId = nodeId;
+ setup.ExecutorsCount = 1;
setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]);
- for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {
+ for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {
setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */));
}
setup.Scheduler.Reset(new TBasicSchedulerThread());
diff --git a/library/cpp/actors/protos/unittests.proto b/library/cpp/actors/protos/unittests.proto
index 64503bc1c1..a856b0942a 100644
--- a/library/cpp/actors/protos/unittests.proto
+++ b/library/cpp/actors/protos/unittests.proto
@@ -1,5 +1,5 @@
-option cc_enable_arenas = true;
-
+option cc_enable_arenas = true;
+
message TSimple {
required string Str1 = 1;
optional string Str2 = 2;
@@ -16,5 +16,5 @@ message TBigMessage {
message TMessageWithPayload {
optional string Meta = 1;
repeated uint32 PayloadId = 2;
- repeated string SomeData = 3;
+ repeated string SomeData = 3;
}
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index fd61e4c720..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -19,7 +19,7 @@
#include <util/string/printf.h>
#include <typeinfo>
-bool VERBOSE = false;
+bool VERBOSE = false;
const bool PRINT_EVENT_BODY = false;
namespace {
@@ -86,8 +86,8 @@ namespace NActors {
public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
- }
-
+ }
+
TEdgeActor(TTestActorRuntimeBase* runtime)
: TActor(&TEdgeActor::StateFunc)
, Runtime(runtime)
@@ -722,9 +722,9 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetVerbose(bool verbose) {
- VERBOSE = verbose;
- }
-
+ VERBOSE = verbose;
+ }
+
void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
Y_VERIFY(!IsInitialized);
Y_VERIFY(nodeIndex < NodeCount);
@@ -1038,10 +1038,10 @@ namespace NActors {
bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
TGuard<TMutex> guard(Mutex);
- return DispatchEventsInternal(options, simDeadline);
- }
-
- // Mutex must be locked by caller!
+ return DispatchEventsInternal(options, simDeadline);
+ }
+
+ // Mutex must be locked by caller!
bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
TDispatchContext localContext;
localContext.Options = &options;
@@ -1253,9 +1253,9 @@ namespace NActors {
if (!localContext.FoundNonEmptyMailboxes.empty())
return true;
- if (options.CustomFinalCondition && options.CustomFinalCondition())
- return true;
-
+ if (options.CustomFinalCondition && options.CustomFinalCondition())
+ return true;
+
if (options.FinalEvents.empty()) {
for (auto& mbox : currentMailboxes) {
if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
@@ -1755,8 +1755,8 @@ namespace NActors {
public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
- }
-
+ }
+
TReplyActor(TStrandingActorDecorator* owner)
: TActor(&TReplyActor::StateFunc)
, Owner(owner)
@@ -1771,8 +1771,8 @@ namespace NActors {
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
- }
-
+ }
+
TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 90de87b5ac..26e3b45c98 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -93,7 +93,7 @@ namespace NActors {
TVector<TFinalEventCondition> FinalEvents;
TVector<TEventMailboxId> NonEmptyMailboxes;
TVector<TEventMailboxId> OnlyMailboxes;
- std::function<bool()> CustomFinalCondition;
+ std::function<bool()> CustomFinalCondition;
bool Quiet = false;
};
@@ -219,8 +219,8 @@ namespace NActors {
TEventFilter SetEventFilter(TEventFilter filterFunc);
TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc);
TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc);
- static bool IsVerbose();
- static void SetVerbose(bool verbose);
+ static bool IsVerbose();
+ static void SetVerbose(bool verbose);
TDuration SetDispatchTimeout(TDuration timeout);
void SetDispatchedEventsLimit(ui64 limit) {
DispatchedEventsLimit = limit;
@@ -499,7 +499,7 @@ namespace NActors {
void ClearMailbox(ui32 nodeId, ui32 hint);
void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId);
void UpdateFinalEventsStatsForEachContext(IEventHandle& ev);
- bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
+ bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
private:
ui64 ScheduledCount;