diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/actors/core | |
parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
download | ydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core')
23 files changed, 205 insertions, 205 deletions
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index a37887c939..e15bb86ce6 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -28,8 +28,8 @@ namespace NActors { } else { static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature"); } - } - + } + TActorBootstrapped() : TActor<TDerived>(&TDerived::StateBootstrap) {} diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 40499d7586..58d360edcc 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -129,7 +129,7 @@ namespace NActors { virtual void SetRealTimeMode() const {} }; - + // could be proxy to in-pool schedulers (for NUMA-aware executors) class ISchedulerThread : TNonCopyable { public: @@ -352,7 +352,7 @@ namespace NActors { NLog::TSettings* LoggerSettings() const { return LoggerSettings0.Get(); } - + void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; void DeferPreStop(std::function<void()> fn) { @@ -360,8 +360,8 @@ namespace NActors { } /* This is the base for memory profiling tags. - System sets memory profiling tag for debug version of lfalloc. - The tag is set as "base_tag + actor_activity_type". */ + System sets memory profiling tag for debug version of lfalloc. + The tag is set as "base_tag + actor_activity_type". */ static ui32 MemProfActivityBase; }; } diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 33f8ce2aaf..1c05ffc3fe 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -1,7 +1,7 @@ #include "event.h" -#include "event_pb.h" - -namespace NActors { +#include "event_pb.h" + +namespace NActors { const TScopeId TScopeId::LocallyGenerated{ Max<ui64>(), Max<ui64>() @@ -22,8 +22,8 @@ namespace NActors { return chainBuf; } return new TEventSerializedData; - } - + } + TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { if (Buffer) return Buffer; @@ -34,5 +34,5 @@ namespace NActors { return Buffer; } return new TEventSerializedData; - } -} + } +} diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6ff02aaf94..081549071d 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -3,7 +3,7 @@ #include "defs.h" #include "actorid.h" #include "callstack.h" -#include "event_load.h" +#include "event_load.h" #include <library/cpp/actors/wilson/wilson_trace.h> @@ -17,13 +17,13 @@ namespace NActors { public: virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; }; - + class IEventBase : TNonCopyable, public ISerializerToStream { public: // actual typing is performed by IEventHandle - + virtual ~IEventBase() { } @@ -87,7 +87,7 @@ namespace NActors { Buffer.Reset(); return x; } - + enum EFlags { FlagTrackDelivery = 1 << 0, FlagForwardOnNondelivery = 1 << 1, @@ -236,7 +236,7 @@ namespace NActors { , RewriteType(Type) { } - + TIntrusivePtr<TEventSerializedData> GetChainBuffer(); TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer(); @@ -248,15 +248,15 @@ namespace NActors { } else { return 0; } - } + } bool HasBuffer() const { return bool(Buffer); - } + } bool HasEvent() const { return bool(Event); - } + } IEventBase* GetBase() { if (!Event) { @@ -326,7 +326,7 @@ namespace NActors { } \ bool IsSerializable() const override { \ return false; \ - } + } #define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \ TString ToStringHeader() const override { \ @@ -340,5 +340,5 @@ namespace NActors { } \ bool IsSerializable() const override { \ return true; \ - } + } } diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 0dab1dd374..da2adc28ea 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -1,24 +1,24 @@ -#pragma once +#pragma once #include <util/stream/walk.h> -#include <util/system/types.h> +#include <util/system/types.h> #include <util/generic/string.h> #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/wilson/wilson_trace.h> - -namespace NActors { + +namespace NActors { class IEventHandle; - + struct TConstIoVec { const void* Data; size_t Size; }; - + struct TIoVec { void* Data; size_t Size; }; - + class TEventSerializedData : public TThrRefBase { @@ -70,7 +70,7 @@ namespace NActors { } return result; } - + TRope EraseBack(size_t count) { Y_VERIFY(count <= Rope.GetSize()); TRope::TIterator iter = Rope.End(); @@ -81,25 +81,25 @@ namespace NActors { void Append(TRope&& from) { Rope.Insert(Rope.End(), std::move(from)); } - + void Append(TString buffer) { if (buffer) { Rope.Insert(Rope.End(), TRope(std::move(buffer))); } } }; -} - +} + class TChainBufWalk : public IWalkInput { TIntrusivePtr<NActors::TEventSerializedData> Buffer; TRope::TConstIterator Iter; - + public: TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer) : Buffer(std::move(buffer)) , Iter(Buffer->GetBeginIter()) {} - + private: size_t DoUnboundedNext(const void **ptr) override { const size_t size = Iter.ContiguousSize(); @@ -108,5 +108,5 @@ private: Iter.AdvanceToNextContiguousBlock(); } return size; - } + } }; diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h index 2845aa94dd..2a4ff9fa55 100644 --- a/library/cpp/actors/core/event_local.h +++ b/library/cpp/actors/core/event_local.h @@ -2,7 +2,7 @@ #include "event.h" #include "scheduler_cookie.h" -#include "event_load.h" +#include "event_load.h" #include <util/system/type_name.h> namespace NActors { diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index 018ff9ac34..bae0a0a64b 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -1,6 +1,6 @@ -#include "event_pb.h" - -namespace NActors { +#include "event_pb.h" + +namespace NActors { bool TRopeStream::Next(const void** data, int* size) { *data = Iter.ContiguousData(); *size = Iter.ContiguousSize(); @@ -13,13 +13,13 @@ namespace NActors { TotalByteCount += *size; return *size != 0; } - + void TRopeStream::BackUp(int count) { Y_VERIFY(count <= TotalByteCount); Iter -= count; TotalByteCount -= count; } - + bool TRopeStream::Skip(int count) { if (static_cast<size_t>(TotalByteCount + count) > Size) { count = Size - TotalByteCount; @@ -27,20 +27,20 @@ namespace NActors { Iter += count; TotalByteCount += count; return static_cast<size_t>(TotalByteCount) != Size; - } - + } + TCoroutineChunkSerializer::TCoroutineChunkSerializer() : TotalSerializedDataSize(0) , Stack(64 * 1024) , SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())} , InnerContext(SelfClosure) {} - + TCoroutineChunkSerializer::~TCoroutineChunkSerializer() { CancelFlag = true; Resume(); Y_VERIFY(Finished); - } + } bool TCoroutineChunkSerializer::AllowsAliasing() const { return true; @@ -85,10 +85,10 @@ namespace NActors { } else { InnerContext.SwitchTo(BufFeedContext); } - } + } return true; - } - + } + bool TCoroutineChunkSerializer::Next(void** data, int* size) { if (CancelFlag || AbortFlag) { return false; @@ -122,15 +122,15 @@ namespace NActors { BufferPtr -= count; SizeRemain += count; TotalSerializedDataSize -= count; - } - + } + void TCoroutineChunkSerializer::Resume() { TContMachineContext feedContext; BufFeedContext = &feedContext; feedContext.SwitchTo(&InnerContext); BufFeedContext = nullptr; - } - + } + bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) { for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) { if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) { @@ -156,14 +156,14 @@ namespace NActors { return {Chunks, Chunks + NumChunks}; } - + void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) { Y_VERIFY(Event == nullptr); Event = event; TotalSerializedDataSize = 0; AbortFlag = false; } - + void TCoroutineChunkSerializer::Abort() { Y_VERIFY(Event); AbortFlag = true; @@ -181,8 +181,8 @@ namespace NActors { } Finished = true; InnerContext.SwitchTo(BufFeedContext); - } - + } + bool TAllocChunkSerializer::Next(void** pdata, int* psize) { if (Backup) { // we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return @@ -200,12 +200,12 @@ namespace NActors { Buffers->Append(TRope(std::move(item))); } return true; - } - + } + void TAllocChunkSerializer::BackUp(int count) { Backup.Insert(Backup.Begin(), Buffers->EraseBack(count)); } - + bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) { Y_VERIFY(false); return false; diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index d7546b901a..1c69d7e9bf 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -1,15 +1,15 @@ #pragma once #include "event.h" -#include "event_load.h" - +#include "event_load.h" + #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/arena.h> #include <library/cpp/actors/protos/actors.pb.h> -#include <util/generic/deque.h> -#include <util/system/context.h> -#include <util/system/filemap.h> -#include <array> +#include <util/generic/deque.h> +#include <util/system/context.h> +#include <util/system/filemap.h> +#include <array> namespace NActors { @@ -29,11 +29,11 @@ namespace NActors { int64_t ByteCount() const override { return TotalByteCount; } - + private: int64_t TotalByteCount = 0; }; - + class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream { public: TChunkSerializer() = default; @@ -42,7 +42,7 @@ namespace NActors { virtual bool WriteRope(const TRope *rope) = 0; virtual bool WriteString(const TString *s) = 0; }; - + class TAllocChunkSerializer final : public TChunkSerializer { public: bool Next(void** data, int* size) override; @@ -51,7 +51,7 @@ namespace NActors { return Buffers->GetSize(); } bool WriteAliasedRaw(const void* data, int size) override; - + // WARNING: these methods require owner to retain ownership and immutability of passed objects bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; @@ -62,19 +62,19 @@ namespace NActors { } return std::move(Buffers); } - + protected: TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData; TRope Backup; }; - + class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine { public: using TChunk = std::pair<const char*, size_t>; TCoroutineChunkSerializer(); ~TCoroutineChunkSerializer(); - + void SetSerializingEvent(const IEventBase *event); void Abort(); std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size); @@ -87,7 +87,7 @@ namespace NActors { const IEventBase *GetCurrentEvent() const { return Event; } - + bool Next(void** data, int* size) override; void BackUp(int count) override; int64_t ByteCount() const override { @@ -95,7 +95,7 @@ namespace NActors { } bool WriteAliasedRaw(const void* data, int size) override; bool AllowsAliasing() const override; - + bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; @@ -103,7 +103,7 @@ namespace NActors { void DoRun() override; void Resume(); bool Produce(const void *data, size_t size); - + i64 TotalSerializedDataSize; TMappedAllocation Stack; TContClosure SelfClosure; @@ -120,7 +120,7 @@ namespace NActors { bool SerializationSuccess; bool Finished = false; }; - + #ifdef ACTORLIB_HUGE_PB_SIZE static const size_t EventMaxByteSize = 140 << 20; // (140MB) #else @@ -137,9 +137,9 @@ namespace NActors { public: using ProtoRecordType = TRecord; - + TEventPBBase() = default; - + explicit TEventPBBase(const TRecord& rec) { Record = rec; @@ -153,7 +153,7 @@ namespace NActors { TString ToStringHeader() const override { return Record.GetTypeName(); } - + TString ToString() const override { return Record.ShortDebugString(); } @@ -274,7 +274,7 @@ namespace NActors { ev->CachedByteSize = input->GetSize(); return ev.Release(); } - + size_t GetCachedByteSize() const { if (CachedByteSize == 0) { CachedByteSize = CalculateSerializedSize(); diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 702cf50fad..88103e888c 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -1,11 +1,11 @@ #pragma once #include "event.h" -#include "event_pb.h" +#include "event_pb.h" #include <library/cpp/actors/protos/actors.pb.h> #include <util/system/unaligned_mem.h> - + namespace NActors { struct TEvents { enum EEventSpace { @@ -213,7 +213,7 @@ namespace NActors { using TEvPoisonPill = TEvPoison; // Legacy name, deprecated using TEvActorDied = TEvGone; - }; + }; } template <> diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h index e7a13c1005..53da592373 100644 --- a/library/cpp/actors/core/executelater.h +++ b/library/cpp/actors/core/executelater.h @@ -1,10 +1,10 @@ -#pragma once - -#include "actor_bootstrapped.h" - -#include <utility> - -namespace NActors { +#pragma once + +#include "actor_bootstrapped.h" + +#include <utility> + +namespace NActors { template <typename TCallback> class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> { public: @@ -13,10 +13,10 @@ namespace NActors { } TExecuteLater( - TCallback&& callback, - IActor::EActivityType activityType, - ui32 channel = 0, - ui64 cookie = 0, + TCallback&& callback, + IActor::EActivityType activityType, + ui32 channel = 0, + ui64 cookie = 0, const TActorId& reportCompletionTo = TActorId(), const TActorId& reportExceptionTo = TActorId()) noexcept : Callback(std::move(callback)) @@ -27,16 +27,16 @@ namespace NActors { { this->SetActivityType(activityType); } - + void Bootstrap(const TActorContext& ctx) noexcept { try { { /* RAII, Callback should be destroyed right before sending - TEvCallbackCompletion */ - + TEvCallbackCompletion */ + auto local = std::move(Callback); using T = decltype(local); - + if constexpr (std::is_invocable_v<T, const TActorContext&>) { local(ctx); } else { @@ -56,11 +56,11 @@ namespace NActors { new TEvents::TEvCallbackException(ctx.SelfID, msg), Channel, Cookie); } - } - + } + this->Die(ctx); - } - + } + private: TCallback Callback; const ui32 Channel; @@ -68,13 +68,13 @@ namespace NActors { const TActorId ReportCompletionTo; const TActorId ReportExceptionTo; }; - + template <typename T> IActor* CreateExecuteLaterActor( - T&& func, - IActor::EActivityType activityType, - ui32 channel = 0, - ui64 cookie = 0, + T&& func, + IActor::EActivityType activityType, + ui32 channel = 0, + ui64 cookie = 0, const TActorId& reportCompletionTo = TActorId(), const TActorId& reportExceptionTo = TActorId()) noexcept { return new TExecuteLater<T>(std::forward<T>(func), @@ -84,4 +84,4 @@ namespace NActors { reportCompletionTo, reportExceptionTo); } -} +} diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 4dce16939a..3123e9b1a6 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -4,23 +4,23 @@ #include <library/cpp/actors/util/affinity.h> #include <library/cpp/actors/util/datetime.h> -#ifdef _linux_ +#ifdef _linux_ #include <pthread.h> -#endif - +#endif + namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX; TBasicExecutorPool::TBasicExecutorPool( - ui32 poolId, - ui32 threads, - ui64 spinThreshold, + ui32 poolId, + ui32 threads, + ui64 spinThreshold, const TString& poolName, TAffinity* affinity, - TDuration timePerMailbox, - ui32 eventsPerMailbox, + TDuration timePerMailbox, + ui32 eventsPerMailbox, int realtimePriority, ui32 maxActivityType) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) @@ -330,10 +330,10 @@ namespace NActors { if (pthread_setschedparam(threadSelf, SCHED_FIFO, ¶m)) { Y_FAIL("Cannot set realtime priority"); } - } -#else + } +#else Y_UNUSED(RealtimePriority); -#endif +#endif } ui32 TBasicExecutorPool::GetThreadCount() const { diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 023190f7fe..65ceed2669 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -62,7 +62,7 @@ namespace NActors { TAtomic ThreadUtilization; TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; - + TAtomic ThreadCount; TMutex ChangeThreadsLock; @@ -81,7 +81,7 @@ namespace NActors { ui32 maxActivityType = 1); explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg); ~TBasicExecutorPool(); - + ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index dac6245635..e5968609e7 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -14,7 +14,7 @@ #include <util/system/datetime.h> #include <util/system/hp_timer.h> - + #include <algorithm> namespace NActors { @@ -1315,7 +1315,7 @@ namespace NActors { if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) { break; // token acquired (or stop) } - } + } wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed()); return result; diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index a090ba2466..01be95b778 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -63,7 +63,7 @@ namespace NActors { // Sets executor for specified pool void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable); - + // Add activation of newly scheduled mailbox and wake cpu to execute it if required void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); @@ -72,7 +72,7 @@ namespace NActors { // Try to wake idle cpu waiting for tokens on specified pool void TryWake(TPoolId pool); - + // Get activation from pool; requires pool's token void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 446b651efd..ac97689f31 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -303,7 +303,7 @@ namespace NActors { ExecutorPool->SetRealTimeMode(); TAffinityGuard affinity(ExecutorPool->Affinity()); - + NHPTimer::STime hpnow = GetCycleCountFast(); NHPTimer::STime hpprev = hpnow; ui64 execCount = 0; diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 9d3c573f0d..66b97bd351 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -45,7 +45,7 @@ namespace NActors { void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId); void DropUnregistered(); const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } - + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 5f63b5af58..bfac7d30e4 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -195,7 +195,7 @@ namespace NActors { , Metrics(std::make_unique<TLoggerMetrics>(metrics)) { } - + TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, std::shared_ptr<TLogBackend> logBackend, std::shared_ptr<NMonitoring::TMetricRegistry> metrics) @@ -260,8 +260,8 @@ namespace NActors { break; default: break; - } - + } + } void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c11a7cf3c1..514ff51c14 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -42,7 +42,7 @@ actorCtxOrSystem, priority, component, __VA_ARGS__); \ } \ } while (0) /**/ - + #define LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, stream) \ LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, "%s", [&]() { \ TStringBuilder logStringBuilder; \ @@ -304,7 +304,7 @@ namespace NActors { ///////////////////////////////////////////////////////////////////// // Logging adaptors for memory log and logging into filesystem ///////////////////////////////////////////////////////////////////// - + namespace NDetail { inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) { va_list params; @@ -318,7 +318,7 @@ namespace NActors { } } // namespace NDetail - template <typename TCtx> + template <typename TCtx> inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str) { const NLog::TSettings *mSettings = ctx.LoggerSettings(); @@ -327,14 +327,14 @@ namespace NActors { } template <typename TCtx, typename... TArgs> - inline void MemLogAdapter( + inline void MemLogAdapter( TCtx& actorCtxOrSystem, NLog::EPriority mPriority, NLog::EComponent mComponent, const char* format, TArgs&&... params) { TString Formatted; - - + + if constexpr (sizeof... (params) > 0) { NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...); } else { @@ -343,9 +343,9 @@ namespace NActors { MemLogWrite(Formatted.data(), Formatted.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted)); - } - - template <typename TCtx> + } + + template <typename TCtx> Y_WRAPPER inline void MemLogAdapter( TCtx& actorCtxOrSystem, NLog::EPriority mPriority, @@ -355,7 +355,7 @@ namespace NActors { MemLogWrite(str.data(), str.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, TString(str)); } - + template <typename TCtx> Y_WRAPPER inline void MemLogAdapter( TCtx& actorCtxOrSystem, @@ -365,5 +365,5 @@ namespace NActors { MemLogWrite(str.data(), str.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(str)); - } + } } diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index d84b4f9e46..ac598eff86 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -214,49 +214,49 @@ namespace NActors { return true; case TMailboxType::HTSwap: { THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; case TMailboxType::ReadAsFilled: { if (lineHint > TReadAsFilledMailbox::MaxMailboxesInLine()) return false; - + TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; case TMailboxType::TinyReadAsFilled: { if (lineHint > TTinyReadAsFilledMailbox::MaxMailboxesInLine()) return false; - + TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; default: Y_FAIL("unknown mailbox type"); - } + } } return false; diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 0bd9c4d314..8a2c0d0608 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -10,7 +10,7 @@ #include <library/cpp/threading/queue/mpsc_read_as_filled.h> #include <util/generic/hash.h> #include <util/system/hp_timer.h> -#include <util/generic/ptr.h> +#include <util/generic/ptr.h> // TODO: clean all broken arcadia atomic stuff and replace with intrinsics namespace NActors { @@ -389,52 +389,52 @@ namespace NActors { constexpr static ui32 AlignedSize() { return ((sizeof(TRevolvingMailbox) + 63) / 64) * 64; } - + std::pair<ui32, ui32> CountRevolvingMailboxEvents(ui64 localActorId, ui32 maxTraverse); bool CleanupEvents(); }; - + static_assert(sizeof(TRevolvingMailbox) == 128, "expect sizeof(TRevolvingMailbox) == 128"); - + struct THTSwapMailbox: public TMailboxHeader { using TQueueType = NThreading::THTSwapQueue<IEventHandle*>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[16]; - + THTSwapMailbox() : TMailboxHeader(TMailboxType::HTSwap) , ScheduleMoment(0) { } - + ~THTSwapMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static THTSwapMailbox* Get(ui32 hint, void* line) { return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * 64); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::HTSwap; - + constexpr static ui32 AlignedSize() { return ((sizeof(THTSwapMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -442,50 +442,50 @@ namespace NActors { return done; } }; - + static_assert(sizeof(THTSwapMailbox) == 64, "expect sizeof(THTSwapMailbox) == 64"); - + struct TReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue<IEventHandle>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[8]; - + TReadAsFilledMailbox() : TMailboxHeader(TMailboxType::ReadAsFilled) , ScheduleMoment(0) { } - + ~TReadAsFilledMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static TReadAsFilledMailbox* Get(ui32 hint, void* line) { return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::ReadAsFilled; - + constexpr static ui32 AlignedSize() { return ((sizeof(TReadAsFilledMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -493,52 +493,52 @@ namespace NActors { return done; } }; - + static_assert(sizeof(TReadAsFilledMailbox) == 192, "expect sizeof(TReadAsFilledMailbox) == 192"); - + struct TTinyReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue< IEventHandle, NThreading::TRaFQueueBunchSize<4>>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[8]; - + TTinyReadAsFilledMailbox() : TMailboxHeader(TMailboxType::TinyReadAsFilled) , ScheduleMoment(0) { } - + ~TTinyReadAsFilledMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static TTinyReadAsFilledMailbox* Get(ui32 hint, void* line) { return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::TinyReadAsFilled; - + constexpr static ui32 AlignedSize() { return ((sizeof(TTinyReadAsFilledMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -546,8 +546,8 @@ namespace NActors { return done; } }; - + static_assert(sizeof(TTinyReadAsFilledMailbox) == 192, "expect sizeof(TTinyReadAsFilledMailbox) == 192"); - }; + }; } diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h index c450f2338e..3ebf6a0bed 100644 --- a/library/cpp/actors/core/mon.h +++ b/library/cpp/actors/core/mon.h @@ -123,7 +123,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfo(bufs->GetString()); } @@ -160,7 +160,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfoRes(bufs->GetString()); } }; @@ -192,7 +192,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteJsonInfoRes(bufs->GetString()); } }; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index d55552af0c..f1d66664b6 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -13,17 +13,17 @@ namespace NActors { inline void Add(ui64 val, ui64 inc = 1) { size_t ind = 0; -#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 +#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 asm volatile("" :: : "memory"); -#endif +#endif if (val > 1) { ind = GetValueBitCount(val - 1); } -#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 +#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 asm volatile("" :: : "memory"); -#endif +#endif RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc); } diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 880a9d00db..22155dbeec 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -32,8 +32,8 @@ SRCS( ask.h balancer.h balancer.cpp - buffer.cpp - buffer.h + buffer.cpp + buffer.h callstack.cpp callstack.h config.h @@ -45,7 +45,7 @@ SRCS( event.h event_load.h event_local.h - event_pb.cpp + event_pb.cpp event_pb.h events.h events_undelivered.cpp |