aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/actors/core
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
downloadydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h4
-rw-r--r--library/cpp/actors/core/actorsystem.h8
-rw-r--r--library/cpp/actors/core/event.cpp14
-rw-r--r--library/cpp/actors/core/event.h20
-rw-r--r--library/cpp/actors/core/event_load.h28
-rw-r--r--library/cpp/actors/core/event_local.h2
-rw-r--r--library/cpp/actors/core/event_pb.cpp46
-rw-r--r--library/cpp/actors/core/event_pb.h42
-rw-r--r--library/cpp/actors/core/events.h6
-rw-r--r--library/cpp/actors/core/executelater.h50
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp22
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h4
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp4
-rw-r--r--library/cpp/actors/core/executor_pool_united.h4
-rw-r--r--library/cpp/actors/core/executor_thread.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp6
-rw-r--r--library/cpp/actors/core/log.h22
-rw-r--r--library/cpp/actors/core/mailbox.cpp24
-rw-r--r--library/cpp/actors/core/mailbox.h80
-rw-r--r--library/cpp/actors/core/mon.h6
-rw-r--r--library/cpp/actors/core/mon_stats.h8
-rw-r--r--library/cpp/actors/core/ya.make6
23 files changed, 205 insertions, 205 deletions
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index a37887c939..e15bb86ce6 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -28,8 +28,8 @@ namespace NActors {
} else {
static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature");
}
- }
-
+ }
+
TActorBootstrapped()
: TActor<TDerived>(&TDerived::StateBootstrap)
{}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 40499d7586..58d360edcc 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -129,7 +129,7 @@ namespace NActors {
virtual void SetRealTimeMode() const {}
};
-
+
// could be proxy to in-pool schedulers (for NUMA-aware executors)
class ISchedulerThread : TNonCopyable {
public:
@@ -352,7 +352,7 @@ namespace NActors {
NLog::TSettings* LoggerSettings() const {
return LoggerSettings0.Get();
}
-
+
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
void DeferPreStop(std::function<void()> fn) {
@@ -360,8 +360,8 @@ namespace NActors {
}
/* This is the base for memory profiling tags.
- System sets memory profiling tag for debug version of lfalloc.
- The tag is set as "base_tag + actor_activity_type". */
+ System sets memory profiling tag for debug version of lfalloc.
+ The tag is set as "base_tag + actor_activity_type". */
static ui32 MemProfActivityBase;
};
}
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 33f8ce2aaf..1c05ffc3fe 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -1,7 +1,7 @@
#include "event.h"
-#include "event_pb.h"
-
-namespace NActors {
+#include "event_pb.h"
+
+namespace NActors {
const TScopeId TScopeId::LocallyGenerated{
Max<ui64>(), Max<ui64>()
@@ -22,8 +22,8 @@ namespace NActors {
return chainBuf;
}
return new TEventSerializedData;
- }
-
+ }
+
TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
if (Buffer)
return Buffer;
@@ -34,5 +34,5 @@ namespace NActors {
return Buffer;
}
return new TEventSerializedData;
- }
-}
+ }
+}
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6ff02aaf94..081549071d 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -3,7 +3,7 @@
#include "defs.h"
#include "actorid.h"
#include "callstack.h"
-#include "event_load.h"
+#include "event_load.h"
#include <library/cpp/actors/wilson/wilson_trace.h>
@@ -17,13 +17,13 @@ namespace NActors {
public:
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
};
-
+
class IEventBase
: TNonCopyable,
public ISerializerToStream {
public:
// actual typing is performed by IEventHandle
-
+
virtual ~IEventBase() {
}
@@ -87,7 +87,7 @@ namespace NActors {
Buffer.Reset();
return x;
}
-
+
enum EFlags {
FlagTrackDelivery = 1 << 0,
FlagForwardOnNondelivery = 1 << 1,
@@ -236,7 +236,7 @@ namespace NActors {
, RewriteType(Type)
{
}
-
+
TIntrusivePtr<TEventSerializedData> GetChainBuffer();
TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
@@ -248,15 +248,15 @@ namespace NActors {
} else {
return 0;
}
- }
+ }
bool HasBuffer() const {
return bool(Buffer);
- }
+ }
bool HasEvent() const {
return bool(Event);
- }
+ }
IEventBase* GetBase() {
if (!Event) {
@@ -326,7 +326,7 @@ namespace NActors {
} \
bool IsSerializable() const override { \
return false; \
- }
+ }
#define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
TString ToStringHeader() const override { \
@@ -340,5 +340,5 @@ namespace NActors {
} \
bool IsSerializable() const override { \
return true; \
- }
+ }
}
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 0dab1dd374..da2adc28ea 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -1,24 +1,24 @@
-#pragma once
+#pragma once
#include <util/stream/walk.h>
-#include <util/system/types.h>
+#include <util/system/types.h>
#include <util/generic/string.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/wilson/wilson_trace.h>
-
-namespace NActors {
+
+namespace NActors {
class IEventHandle;
-
+
struct TConstIoVec {
const void* Data;
size_t Size;
};
-
+
struct TIoVec {
void* Data;
size_t Size;
};
-
+
class TEventSerializedData
: public TThrRefBase
{
@@ -70,7 +70,7 @@ namespace NActors {
}
return result;
}
-
+
TRope EraseBack(size_t count) {
Y_VERIFY(count <= Rope.GetSize());
TRope::TIterator iter = Rope.End();
@@ -81,25 +81,25 @@ namespace NActors {
void Append(TRope&& from) {
Rope.Insert(Rope.End(), std::move(from));
}
-
+
void Append(TString buffer) {
if (buffer) {
Rope.Insert(Rope.End(), TRope(std::move(buffer)));
}
}
};
-}
-
+}
+
class TChainBufWalk : public IWalkInput {
TIntrusivePtr<NActors::TEventSerializedData> Buffer;
TRope::TConstIterator Iter;
-
+
public:
TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer)
: Buffer(std::move(buffer))
, Iter(Buffer->GetBeginIter())
{}
-
+
private:
size_t DoUnboundedNext(const void **ptr) override {
const size_t size = Iter.ContiguousSize();
@@ -108,5 +108,5 @@ private:
Iter.AdvanceToNextContiguousBlock();
}
return size;
- }
+ }
};
diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h
index 2845aa94dd..2a4ff9fa55 100644
--- a/library/cpp/actors/core/event_local.h
+++ b/library/cpp/actors/core/event_local.h
@@ -2,7 +2,7 @@
#include "event.h"
#include "scheduler_cookie.h"
-#include "event_load.h"
+#include "event_load.h"
#include <util/system/type_name.h>
namespace NActors {
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index 018ff9ac34..bae0a0a64b 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -1,6 +1,6 @@
-#include "event_pb.h"
-
-namespace NActors {
+#include "event_pb.h"
+
+namespace NActors {
bool TRopeStream::Next(const void** data, int* size) {
*data = Iter.ContiguousData();
*size = Iter.ContiguousSize();
@@ -13,13 +13,13 @@ namespace NActors {
TotalByteCount += *size;
return *size != 0;
}
-
+
void TRopeStream::BackUp(int count) {
Y_VERIFY(count <= TotalByteCount);
Iter -= count;
TotalByteCount -= count;
}
-
+
bool TRopeStream::Skip(int count) {
if (static_cast<size_t>(TotalByteCount + count) > Size) {
count = Size - TotalByteCount;
@@ -27,20 +27,20 @@ namespace NActors {
Iter += count;
TotalByteCount += count;
return static_cast<size_t>(TotalByteCount) != Size;
- }
-
+ }
+
TCoroutineChunkSerializer::TCoroutineChunkSerializer()
: TotalSerializedDataSize(0)
, Stack(64 * 1024)
, SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
, InnerContext(SelfClosure)
{}
-
+
TCoroutineChunkSerializer::~TCoroutineChunkSerializer() {
CancelFlag = true;
Resume();
Y_VERIFY(Finished);
- }
+ }
bool TCoroutineChunkSerializer::AllowsAliasing() const {
return true;
@@ -85,10 +85,10 @@ namespace NActors {
} else {
InnerContext.SwitchTo(BufFeedContext);
}
- }
+ }
return true;
- }
-
+ }
+
bool TCoroutineChunkSerializer::Next(void** data, int* size) {
if (CancelFlag || AbortFlag) {
return false;
@@ -122,15 +122,15 @@ namespace NActors {
BufferPtr -= count;
SizeRemain += count;
TotalSerializedDataSize -= count;
- }
-
+ }
+
void TCoroutineChunkSerializer::Resume() {
TContMachineContext feedContext;
BufFeedContext = &feedContext;
feedContext.SwitchTo(&InnerContext);
BufFeedContext = nullptr;
- }
-
+ }
+
bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) {
for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) {
if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) {
@@ -156,14 +156,14 @@ namespace NActors {
return {Chunks, Chunks + NumChunks};
}
-
+
void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
Y_VERIFY(Event == nullptr);
Event = event;
TotalSerializedDataSize = 0;
AbortFlag = false;
}
-
+
void TCoroutineChunkSerializer::Abort() {
Y_VERIFY(Event);
AbortFlag = true;
@@ -181,8 +181,8 @@ namespace NActors {
}
Finished = true;
InnerContext.SwitchTo(BufFeedContext);
- }
-
+ }
+
bool TAllocChunkSerializer::Next(void** pdata, int* psize) {
if (Backup) {
// we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return
@@ -200,12 +200,12 @@ namespace NActors {
Buffers->Append(TRope(std::move(item)));
}
return true;
- }
-
+ }
+
void TAllocChunkSerializer::BackUp(int count) {
Backup.Insert(Backup.Begin(), Buffers->EraseBack(count));
}
-
+
bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) {
Y_VERIFY(false);
return false;
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d7546b901a..1c69d7e9bf 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -1,15 +1,15 @@
#pragma once
#include "event.h"
-#include "event_load.h"
-
+#include "event_load.h"
+
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/arena.h>
#include <library/cpp/actors/protos/actors.pb.h>
-#include <util/generic/deque.h>
-#include <util/system/context.h>
-#include <util/system/filemap.h>
-#include <array>
+#include <util/generic/deque.h>
+#include <util/system/context.h>
+#include <util/system/filemap.h>
+#include <array>
namespace NActors {
@@ -29,11 +29,11 @@ namespace NActors {
int64_t ByteCount() const override {
return TotalByteCount;
}
-
+
private:
int64_t TotalByteCount = 0;
};
-
+
class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
public:
TChunkSerializer() = default;
@@ -42,7 +42,7 @@ namespace NActors {
virtual bool WriteRope(const TRope *rope) = 0;
virtual bool WriteString(const TString *s) = 0;
};
-
+
class TAllocChunkSerializer final : public TChunkSerializer {
public:
bool Next(void** data, int* size) override;
@@ -51,7 +51,7 @@ namespace NActors {
return Buffers->GetSize();
}
bool WriteAliasedRaw(const void* data, int size) override;
-
+
// WARNING: these methods require owner to retain ownership and immutability of passed objects
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
@@ -62,19 +62,19 @@ namespace NActors {
}
return std::move(Buffers);
}
-
+
protected:
TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
TRope Backup;
};
-
+
class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
public:
using TChunk = std::pair<const char*, size_t>;
TCoroutineChunkSerializer();
~TCoroutineChunkSerializer();
-
+
void SetSerializingEvent(const IEventBase *event);
void Abort();
std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
@@ -87,7 +87,7 @@ namespace NActors {
const IEventBase *GetCurrentEvent() const {
return Event;
}
-
+
bool Next(void** data, int* size) override;
void BackUp(int count) override;
int64_t ByteCount() const override {
@@ -95,7 +95,7 @@ namespace NActors {
}
bool WriteAliasedRaw(const void* data, int size) override;
bool AllowsAliasing() const override;
-
+
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
@@ -103,7 +103,7 @@ namespace NActors {
void DoRun() override;
void Resume();
bool Produce(const void *data, size_t size);
-
+
i64 TotalSerializedDataSize;
TMappedAllocation Stack;
TContClosure SelfClosure;
@@ -120,7 +120,7 @@ namespace NActors {
bool SerializationSuccess;
bool Finished = false;
};
-
+
#ifdef ACTORLIB_HUGE_PB_SIZE
static const size_t EventMaxByteSize = 140 << 20; // (140MB)
#else
@@ -137,9 +137,9 @@ namespace NActors {
public:
using ProtoRecordType = TRecord;
-
+
TEventPBBase() = default;
-
+
explicit TEventPBBase(const TRecord& rec)
{
Record = rec;
@@ -153,7 +153,7 @@ namespace NActors {
TString ToStringHeader() const override {
return Record.GetTypeName();
}
-
+
TString ToString() const override {
return Record.ShortDebugString();
}
@@ -274,7 +274,7 @@ namespace NActors {
ev->CachedByteSize = input->GetSize();
return ev.Release();
}
-
+
size_t GetCachedByteSize() const {
if (CachedByteSize == 0) {
CachedByteSize = CalculateSerializedSize();
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 702cf50fad..88103e888c 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -1,11 +1,11 @@
#pragma once
#include "event.h"
-#include "event_pb.h"
+#include "event_pb.h"
#include <library/cpp/actors/protos/actors.pb.h>
#include <util/system/unaligned_mem.h>
-
+
namespace NActors {
struct TEvents {
enum EEventSpace {
@@ -213,7 +213,7 @@ namespace NActors {
using TEvPoisonPill = TEvPoison; // Legacy name, deprecated
using TEvActorDied = TEvGone;
- };
+ };
}
template <>
diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h
index e7a13c1005..53da592373 100644
--- a/library/cpp/actors/core/executelater.h
+++ b/library/cpp/actors/core/executelater.h
@@ -1,10 +1,10 @@
-#pragma once
-
-#include "actor_bootstrapped.h"
-
-#include <utility>
-
-namespace NActors {
+#pragma once
+
+#include "actor_bootstrapped.h"
+
+#include <utility>
+
+namespace NActors {
template <typename TCallback>
class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> {
public:
@@ -13,10 +13,10 @@ namespace NActors {
}
TExecuteLater(
- TCallback&& callback,
- IActor::EActivityType activityType,
- ui32 channel = 0,
- ui64 cookie = 0,
+ TCallback&& callback,
+ IActor::EActivityType activityType,
+ ui32 channel = 0,
+ ui64 cookie = 0,
const TActorId& reportCompletionTo = TActorId(),
const TActorId& reportExceptionTo = TActorId()) noexcept
: Callback(std::move(callback))
@@ -27,16 +27,16 @@ namespace NActors {
{
this->SetActivityType(activityType);
}
-
+
void Bootstrap(const TActorContext& ctx) noexcept {
try {
{
/* RAII, Callback should be destroyed right before sending
- TEvCallbackCompletion */
-
+ TEvCallbackCompletion */
+
auto local = std::move(Callback);
using T = decltype(local);
-
+
if constexpr (std::is_invocable_v<T, const TActorContext&>) {
local(ctx);
} else {
@@ -56,11 +56,11 @@ namespace NActors {
new TEvents::TEvCallbackException(ctx.SelfID, msg),
Channel, Cookie);
}
- }
-
+ }
+
this->Die(ctx);
- }
-
+ }
+
private:
TCallback Callback;
const ui32 Channel;
@@ -68,13 +68,13 @@ namespace NActors {
const TActorId ReportCompletionTo;
const TActorId ReportExceptionTo;
};
-
+
template <typename T>
IActor* CreateExecuteLaterActor(
- T&& func,
- IActor::EActivityType activityType,
- ui32 channel = 0,
- ui64 cookie = 0,
+ T&& func,
+ IActor::EActivityType activityType,
+ ui32 channel = 0,
+ ui64 cookie = 0,
const TActorId& reportCompletionTo = TActorId(),
const TActorId& reportExceptionTo = TActorId()) noexcept {
return new TExecuteLater<T>(std::forward<T>(func),
@@ -84,4 +84,4 @@ namespace NActors {
reportCompletionTo,
reportExceptionTo);
}
-}
+}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 4dce16939a..3123e9b1a6 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -4,23 +4,23 @@
#include <library/cpp/actors/util/affinity.h>
#include <library/cpp/actors/util/datetime.h>
-#ifdef _linux_
+#ifdef _linux_
#include <pthread.h>
-#endif
-
+#endif
+
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX;
TBasicExecutorPool::TBasicExecutorPool(
- ui32 poolId,
- ui32 threads,
- ui64 spinThreshold,
+ ui32 poolId,
+ ui32 threads,
+ ui64 spinThreshold,
const TString& poolName,
TAffinity* affinity,
- TDuration timePerMailbox,
- ui32 eventsPerMailbox,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox,
int realtimePriority,
ui32 maxActivityType)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
@@ -330,10 +330,10 @@ namespace NActors {
if (pthread_setschedparam(threadSelf, SCHED_FIFO, &param)) {
Y_FAIL("Cannot set realtime priority");
}
- }
-#else
+ }
+#else
Y_UNUSED(RealtimePriority);
-#endif
+#endif
}
ui32 TBasicExecutorPool::GetThreadCount() const {
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 023190f7fe..65ceed2669 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -62,7 +62,7 @@ namespace NActors {
TAtomic ThreadUtilization;
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
-
+
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
@@ -81,7 +81,7 @@ namespace NActors {
ui32 maxActivityType = 1);
explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg);
~TBasicExecutorPool();
-
+
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index dac6245635..e5968609e7 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -14,7 +14,7 @@
#include <util/system/datetime.h>
#include <util/system/hp_timer.h>
-
+
#include <algorithm>
namespace NActors {
@@ -1315,7 +1315,7 @@ namespace NActors {
if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) {
break; // token acquired (or stop)
}
- }
+ }
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed());
return result;
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index a090ba2466..01be95b778 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -63,7 +63,7 @@ namespace NActors {
// Sets executor for specified pool
void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable);
-
+
// Add activation of newly scheduled mailbox and wake cpu to execute it if required
void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
@@ -72,7 +72,7 @@ namespace NActors {
// Try to wake idle cpu waiting for tokens on specified pool
void TryWake(TPoolId pool);
-
+
// Get activation from pool; requires pool's token
void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 446b651efd..ac97689f31 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -303,7 +303,7 @@ namespace NActors {
ExecutorPool->SetRealTimeMode();
TAffinityGuard affinity(ExecutorPool->Affinity());
-
+
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = hpnow;
ui64 execCount = 0;
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d3c573f0d..66b97bd351 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -45,7 +45,7 @@ namespace NActors {
void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId);
void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
-
+
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 5f63b5af58..bfac7d30e4 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -195,7 +195,7 @@ namespace NActors {
, Metrics(std::make_unique<TLoggerMetrics>(metrics))
{
}
-
+
TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
std::shared_ptr<TLogBackend> logBackend,
std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
@@ -260,8 +260,8 @@ namespace NActors {
break;
default:
break;
- }
-
+ }
+
}
void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c11a7cf3c1..514ff51c14 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -42,7 +42,7 @@
actorCtxOrSystem, priority, component, __VA_ARGS__); \
} \
} while (0) /**/
-
+
#define LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, stream) \
LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, "%s", [&]() { \
TStringBuilder logStringBuilder; \
@@ -304,7 +304,7 @@ namespace NActors {
/////////////////////////////////////////////////////////////////////
// Logging adaptors for memory log and logging into filesystem
/////////////////////////////////////////////////////////////////////
-
+
namespace NDetail {
inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) {
va_list params;
@@ -318,7 +318,7 @@ namespace NActors {
}
} // namespace NDetail
- template <typename TCtx>
+ template <typename TCtx>
inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str)
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
@@ -327,14 +327,14 @@ namespace NActors {
}
template <typename TCtx, typename... TArgs>
- inline void MemLogAdapter(
+ inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
NLog::EPriority mPriority,
NLog::EComponent mComponent,
const char* format, TArgs&&... params) {
TString Formatted;
-
-
+
+
if constexpr (sizeof... (params) > 0) {
NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...);
} else {
@@ -343,9 +343,9 @@ namespace NActors {
MemLogWrite(Formatted.data(), Formatted.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted));
- }
-
- template <typename TCtx>
+ }
+
+ template <typename TCtx>
Y_WRAPPER inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
NLog::EPriority mPriority,
@@ -355,7 +355,7 @@ namespace NActors {
MemLogWrite(str.data(), str.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, TString(str));
}
-
+
template <typename TCtx>
Y_WRAPPER inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
@@ -365,5 +365,5 @@ namespace NActors {
MemLogWrite(str.data(), str.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(str));
- }
+ }
}
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index d84b4f9e46..ac598eff86 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -214,49 +214,49 @@ namespace NActors {
return true;
case TMailboxType::HTSwap: {
THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
case TMailboxType::ReadAsFilled: {
if (lineHint > TReadAsFilledMailbox::MaxMailboxesInLine())
return false;
-
+
TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
case TMailboxType::TinyReadAsFilled: {
if (lineHint > TTinyReadAsFilledMailbox::MaxMailboxesInLine())
return false;
-
+
TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
default:
Y_FAIL("unknown mailbox type");
- }
+ }
}
return false;
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 0bd9c4d314..8a2c0d0608 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -10,7 +10,7 @@
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
#include <util/generic/hash.h>
#include <util/system/hp_timer.h>
-#include <util/generic/ptr.h>
+#include <util/generic/ptr.h>
// TODO: clean all broken arcadia atomic stuff and replace with intrinsics
namespace NActors {
@@ -389,52 +389,52 @@ namespace NActors {
constexpr static ui32 AlignedSize() {
return ((sizeof(TRevolvingMailbox) + 63) / 64) * 64;
}
-
+
std::pair<ui32, ui32> CountRevolvingMailboxEvents(ui64 localActorId, ui32 maxTraverse);
bool CleanupEvents();
};
-
+
static_assert(sizeof(TRevolvingMailbox) == 128, "expect sizeof(TRevolvingMailbox) == 128");
-
+
struct THTSwapMailbox: public TMailboxHeader {
using TQueueType = NThreading::THTSwapQueue<IEventHandle*>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[16];
-
+
THTSwapMailbox()
: TMailboxHeader(TMailboxType::HTSwap)
, ScheduleMoment(0)
{
}
-
+
~THTSwapMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static THTSwapMailbox* Get(ui32 hint, void* line) {
return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * 64);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType = TMailboxType::HTSwap;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(THTSwapMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -442,50 +442,50 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(THTSwapMailbox) == 64,
"expect sizeof(THTSwapMailbox) == 64");
-
+
struct TReadAsFilledMailbox: public TMailboxHeader {
using TQueueType = NThreading::TReadAsFilledQueue<IEventHandle>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[8];
-
+
TReadAsFilledMailbox()
: TMailboxHeader(TMailboxType::ReadAsFilled)
, ScheduleMoment(0)
{
}
-
+
~TReadAsFilledMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static TReadAsFilledMailbox* Get(ui32 hint, void* line) {
return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType =
TMailboxType::ReadAsFilled;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(TReadAsFilledMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -493,52 +493,52 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(TReadAsFilledMailbox) == 192,
"expect sizeof(TReadAsFilledMailbox) == 192");
-
+
struct TTinyReadAsFilledMailbox: public TMailboxHeader {
using TQueueType = NThreading::TReadAsFilledQueue<
IEventHandle,
NThreading::TRaFQueueBunchSize<4>>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[8];
-
+
TTinyReadAsFilledMailbox()
: TMailboxHeader(TMailboxType::TinyReadAsFilled)
, ScheduleMoment(0)
{
}
-
+
~TTinyReadAsFilledMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static TTinyReadAsFilledMailbox* Get(ui32 hint, void* line) {
return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType =
TMailboxType::TinyReadAsFilled;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(TTinyReadAsFilledMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -546,8 +546,8 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(TTinyReadAsFilledMailbox) == 192,
"expect sizeof(TTinyReadAsFilledMailbox) == 192");
- };
+ };
}
diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h
index c450f2338e..3ebf6a0bed 100644
--- a/library/cpp/actors/core/mon.h
+++ b/library/cpp/actors/core/mon.h
@@ -123,7 +123,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfo(bufs->GetString());
}
@@ -160,7 +160,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfoRes(bufs->GetString());
}
};
@@ -192,7 +192,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteJsonInfoRes(bufs->GetString());
}
};
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index d55552af0c..f1d66664b6 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -13,17 +13,17 @@ namespace NActors {
inline void Add(ui64 val, ui64 inc = 1) {
size_t ind = 0;
-#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
+#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
asm volatile("" ::
: "memory");
-#endif
+#endif
if (val > 1) {
ind = GetValueBitCount(val - 1);
}
-#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
+#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
asm volatile("" ::
: "memory");
-#endif
+#endif
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc);
}
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 880a9d00db..22155dbeec 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -32,8 +32,8 @@ SRCS(
ask.h
balancer.h
balancer.cpp
- buffer.cpp
- buffer.h
+ buffer.cpp
+ buffer.h
callstack.cpp
callstack.h
config.h
@@ -45,7 +45,7 @@ SRCS(
event.h
event_load.h
event_local.h
- event_pb.cpp
+ event_pb.cpp
event_pb.h
events.h
events_undelivered.cpp