diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | d3530b2692e400bd4d29bd4f07cafaee139164e7 (patch) | |
tree | b7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/actors | |
parent | 0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff) | |
download | ydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors')
43 files changed, 1048 insertions, 1048 deletions
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index a37887c939..e15bb86ce6 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -28,8 +28,8 @@ namespace NActors { } else { static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature"); } - } - + } + TActorBootstrapped() : TActor<TDerived>(&TDerived::StateBootstrap) {} diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 40499d7586..58d360edcc 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -129,7 +129,7 @@ namespace NActors { virtual void SetRealTimeMode() const {} }; - + // could be proxy to in-pool schedulers (for NUMA-aware executors) class ISchedulerThread : TNonCopyable { public: @@ -352,7 +352,7 @@ namespace NActors { NLog::TSettings* LoggerSettings() const { return LoggerSettings0.Get(); } - + void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; void DeferPreStop(std::function<void()> fn) { @@ -360,8 +360,8 @@ namespace NActors { } /* This is the base for memory profiling tags. - System sets memory profiling tag for debug version of lfalloc. - The tag is set as "base_tag + actor_activity_type". */ + System sets memory profiling tag for debug version of lfalloc. + The tag is set as "base_tag + actor_activity_type". */ static ui32 MemProfActivityBase; }; } diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 33f8ce2aaf..1c05ffc3fe 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -1,7 +1,7 @@ #include "event.h" -#include "event_pb.h" - -namespace NActors { +#include "event_pb.h" + +namespace NActors { const TScopeId TScopeId::LocallyGenerated{ Max<ui64>(), Max<ui64>() @@ -22,8 +22,8 @@ namespace NActors { return chainBuf; } return new TEventSerializedData; - } - + } + TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { if (Buffer) return Buffer; @@ -34,5 +34,5 @@ namespace NActors { return Buffer; } return new TEventSerializedData; - } -} + } +} diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6ff02aaf94..081549071d 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -3,7 +3,7 @@ #include "defs.h" #include "actorid.h" #include "callstack.h" -#include "event_load.h" +#include "event_load.h" #include <library/cpp/actors/wilson/wilson_trace.h> @@ -17,13 +17,13 @@ namespace NActors { public: virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; }; - + class IEventBase : TNonCopyable, public ISerializerToStream { public: // actual typing is performed by IEventHandle - + virtual ~IEventBase() { } @@ -87,7 +87,7 @@ namespace NActors { Buffer.Reset(); return x; } - + enum EFlags { FlagTrackDelivery = 1 << 0, FlagForwardOnNondelivery = 1 << 1, @@ -236,7 +236,7 @@ namespace NActors { , RewriteType(Type) { } - + TIntrusivePtr<TEventSerializedData> GetChainBuffer(); TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer(); @@ -248,15 +248,15 @@ namespace NActors { } else { return 0; } - } + } bool HasBuffer() const { return bool(Buffer); - } + } bool HasEvent() const { return bool(Event); - } + } IEventBase* GetBase() { if (!Event) { @@ -326,7 +326,7 @@ namespace NActors { } \ bool IsSerializable() const override { \ return false; \ - } + } #define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \ TString ToStringHeader() const override { \ @@ -340,5 +340,5 @@ namespace NActors { } \ bool IsSerializable() const override { \ return true; \ - } + } } diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 0dab1dd374..da2adc28ea 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -1,24 +1,24 @@ -#pragma once +#pragma once #include <util/stream/walk.h> -#include <util/system/types.h> +#include <util/system/types.h> #include <util/generic/string.h> #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/wilson/wilson_trace.h> - -namespace NActors { + +namespace NActors { class IEventHandle; - + struct TConstIoVec { const void* Data; size_t Size; }; - + struct TIoVec { void* Data; size_t Size; }; - + class TEventSerializedData : public TThrRefBase { @@ -70,7 +70,7 @@ namespace NActors { } return result; } - + TRope EraseBack(size_t count) { Y_VERIFY(count <= Rope.GetSize()); TRope::TIterator iter = Rope.End(); @@ -81,25 +81,25 @@ namespace NActors { void Append(TRope&& from) { Rope.Insert(Rope.End(), std::move(from)); } - + void Append(TString buffer) { if (buffer) { Rope.Insert(Rope.End(), TRope(std::move(buffer))); } } }; -} - +} + class TChainBufWalk : public IWalkInput { TIntrusivePtr<NActors::TEventSerializedData> Buffer; TRope::TConstIterator Iter; - + public: TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer) : Buffer(std::move(buffer)) , Iter(Buffer->GetBeginIter()) {} - + private: size_t DoUnboundedNext(const void **ptr) override { const size_t size = Iter.ContiguousSize(); @@ -108,5 +108,5 @@ private: Iter.AdvanceToNextContiguousBlock(); } return size; - } + } }; diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h index 2845aa94dd..2a4ff9fa55 100644 --- a/library/cpp/actors/core/event_local.h +++ b/library/cpp/actors/core/event_local.h @@ -2,7 +2,7 @@ #include "event.h" #include "scheduler_cookie.h" -#include "event_load.h" +#include "event_load.h" #include <util/system/type_name.h> namespace NActors { diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index 018ff9ac34..bae0a0a64b 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -1,6 +1,6 @@ -#include "event_pb.h" - -namespace NActors { +#include "event_pb.h" + +namespace NActors { bool TRopeStream::Next(const void** data, int* size) { *data = Iter.ContiguousData(); *size = Iter.ContiguousSize(); @@ -13,13 +13,13 @@ namespace NActors { TotalByteCount += *size; return *size != 0; } - + void TRopeStream::BackUp(int count) { Y_VERIFY(count <= TotalByteCount); Iter -= count; TotalByteCount -= count; } - + bool TRopeStream::Skip(int count) { if (static_cast<size_t>(TotalByteCount + count) > Size) { count = Size - TotalByteCount; @@ -27,20 +27,20 @@ namespace NActors { Iter += count; TotalByteCount += count; return static_cast<size_t>(TotalByteCount) != Size; - } - + } + TCoroutineChunkSerializer::TCoroutineChunkSerializer() : TotalSerializedDataSize(0) , Stack(64 * 1024) , SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())} , InnerContext(SelfClosure) {} - + TCoroutineChunkSerializer::~TCoroutineChunkSerializer() { CancelFlag = true; Resume(); Y_VERIFY(Finished); - } + } bool TCoroutineChunkSerializer::AllowsAliasing() const { return true; @@ -85,10 +85,10 @@ namespace NActors { } else { InnerContext.SwitchTo(BufFeedContext); } - } + } return true; - } - + } + bool TCoroutineChunkSerializer::Next(void** data, int* size) { if (CancelFlag || AbortFlag) { return false; @@ -122,15 +122,15 @@ namespace NActors { BufferPtr -= count; SizeRemain += count; TotalSerializedDataSize -= count; - } - + } + void TCoroutineChunkSerializer::Resume() { TContMachineContext feedContext; BufFeedContext = &feedContext; feedContext.SwitchTo(&InnerContext); BufFeedContext = nullptr; - } - + } + bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) { for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) { if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) { @@ -156,14 +156,14 @@ namespace NActors { return {Chunks, Chunks + NumChunks}; } - + void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) { Y_VERIFY(Event == nullptr); Event = event; TotalSerializedDataSize = 0; AbortFlag = false; } - + void TCoroutineChunkSerializer::Abort() { Y_VERIFY(Event); AbortFlag = true; @@ -181,8 +181,8 @@ namespace NActors { } Finished = true; InnerContext.SwitchTo(BufFeedContext); - } - + } + bool TAllocChunkSerializer::Next(void** pdata, int* psize) { if (Backup) { // we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return @@ -200,12 +200,12 @@ namespace NActors { Buffers->Append(TRope(std::move(item))); } return true; - } - + } + void TAllocChunkSerializer::BackUp(int count) { Backup.Insert(Backup.Begin(), Buffers->EraseBack(count)); } - + bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) { Y_VERIFY(false); return false; diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index d7546b901a..1c69d7e9bf 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -1,15 +1,15 @@ #pragma once #include "event.h" -#include "event_load.h" - +#include "event_load.h" + #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/arena.h> #include <library/cpp/actors/protos/actors.pb.h> -#include <util/generic/deque.h> -#include <util/system/context.h> -#include <util/system/filemap.h> -#include <array> +#include <util/generic/deque.h> +#include <util/system/context.h> +#include <util/system/filemap.h> +#include <array> namespace NActors { @@ -29,11 +29,11 @@ namespace NActors { int64_t ByteCount() const override { return TotalByteCount; } - + private: int64_t TotalByteCount = 0; }; - + class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream { public: TChunkSerializer() = default; @@ -42,7 +42,7 @@ namespace NActors { virtual bool WriteRope(const TRope *rope) = 0; virtual bool WriteString(const TString *s) = 0; }; - + class TAllocChunkSerializer final : public TChunkSerializer { public: bool Next(void** data, int* size) override; @@ -51,7 +51,7 @@ namespace NActors { return Buffers->GetSize(); } bool WriteAliasedRaw(const void* data, int size) override; - + // WARNING: these methods require owner to retain ownership and immutability of passed objects bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; @@ -62,19 +62,19 @@ namespace NActors { } return std::move(Buffers); } - + protected: TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData; TRope Backup; }; - + class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine { public: using TChunk = std::pair<const char*, size_t>; TCoroutineChunkSerializer(); ~TCoroutineChunkSerializer(); - + void SetSerializingEvent(const IEventBase *event); void Abort(); std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size); @@ -87,7 +87,7 @@ namespace NActors { const IEventBase *GetCurrentEvent() const { return Event; } - + bool Next(void** data, int* size) override; void BackUp(int count) override; int64_t ByteCount() const override { @@ -95,7 +95,7 @@ namespace NActors { } bool WriteAliasedRaw(const void* data, int size) override; bool AllowsAliasing() const override; - + bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; @@ -103,7 +103,7 @@ namespace NActors { void DoRun() override; void Resume(); bool Produce(const void *data, size_t size); - + i64 TotalSerializedDataSize; TMappedAllocation Stack; TContClosure SelfClosure; @@ -120,7 +120,7 @@ namespace NActors { bool SerializationSuccess; bool Finished = false; }; - + #ifdef ACTORLIB_HUGE_PB_SIZE static const size_t EventMaxByteSize = 140 << 20; // (140MB) #else @@ -137,9 +137,9 @@ namespace NActors { public: using ProtoRecordType = TRecord; - + TEventPBBase() = default; - + explicit TEventPBBase(const TRecord& rec) { Record = rec; @@ -153,7 +153,7 @@ namespace NActors { TString ToStringHeader() const override { return Record.GetTypeName(); } - + TString ToString() const override { return Record.ShortDebugString(); } @@ -274,7 +274,7 @@ namespace NActors { ev->CachedByteSize = input->GetSize(); return ev.Release(); } - + size_t GetCachedByteSize() const { if (CachedByteSize == 0) { CachedByteSize = CalculateSerializedSize(); diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 702cf50fad..88103e888c 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -1,11 +1,11 @@ #pragma once #include "event.h" -#include "event_pb.h" +#include "event_pb.h" #include <library/cpp/actors/protos/actors.pb.h> #include <util/system/unaligned_mem.h> - + namespace NActors { struct TEvents { enum EEventSpace { @@ -213,7 +213,7 @@ namespace NActors { using TEvPoisonPill = TEvPoison; // Legacy name, deprecated using TEvActorDied = TEvGone; - }; + }; } template <> diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h index e7a13c1005..53da592373 100644 --- a/library/cpp/actors/core/executelater.h +++ b/library/cpp/actors/core/executelater.h @@ -1,10 +1,10 @@ -#pragma once - -#include "actor_bootstrapped.h" - -#include <utility> - -namespace NActors { +#pragma once + +#include "actor_bootstrapped.h" + +#include <utility> + +namespace NActors { template <typename TCallback> class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> { public: @@ -13,10 +13,10 @@ namespace NActors { } TExecuteLater( - TCallback&& callback, - IActor::EActivityType activityType, - ui32 channel = 0, - ui64 cookie = 0, + TCallback&& callback, + IActor::EActivityType activityType, + ui32 channel = 0, + ui64 cookie = 0, const TActorId& reportCompletionTo = TActorId(), const TActorId& reportExceptionTo = TActorId()) noexcept : Callback(std::move(callback)) @@ -27,16 +27,16 @@ namespace NActors { { this->SetActivityType(activityType); } - + void Bootstrap(const TActorContext& ctx) noexcept { try { { /* RAII, Callback should be destroyed right before sending - TEvCallbackCompletion */ - + TEvCallbackCompletion */ + auto local = std::move(Callback); using T = decltype(local); - + if constexpr (std::is_invocable_v<T, const TActorContext&>) { local(ctx); } else { @@ -56,11 +56,11 @@ namespace NActors { new TEvents::TEvCallbackException(ctx.SelfID, msg), Channel, Cookie); } - } - + } + this->Die(ctx); - } - + } + private: TCallback Callback; const ui32 Channel; @@ -68,13 +68,13 @@ namespace NActors { const TActorId ReportCompletionTo; const TActorId ReportExceptionTo; }; - + template <typename T> IActor* CreateExecuteLaterActor( - T&& func, - IActor::EActivityType activityType, - ui32 channel = 0, - ui64 cookie = 0, + T&& func, + IActor::EActivityType activityType, + ui32 channel = 0, + ui64 cookie = 0, const TActorId& reportCompletionTo = TActorId(), const TActorId& reportExceptionTo = TActorId()) noexcept { return new TExecuteLater<T>(std::forward<T>(func), @@ -84,4 +84,4 @@ namespace NActors { reportCompletionTo, reportExceptionTo); } -} +} diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 4dce16939a..3123e9b1a6 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -4,23 +4,23 @@ #include <library/cpp/actors/util/affinity.h> #include <library/cpp/actors/util/datetime.h> -#ifdef _linux_ +#ifdef _linux_ #include <pthread.h> -#endif - +#endif + namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX; TBasicExecutorPool::TBasicExecutorPool( - ui32 poolId, - ui32 threads, - ui64 spinThreshold, + ui32 poolId, + ui32 threads, + ui64 spinThreshold, const TString& poolName, TAffinity* affinity, - TDuration timePerMailbox, - ui32 eventsPerMailbox, + TDuration timePerMailbox, + ui32 eventsPerMailbox, int realtimePriority, ui32 maxActivityType) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) @@ -330,10 +330,10 @@ namespace NActors { if (pthread_setschedparam(threadSelf, SCHED_FIFO, ¶m)) { Y_FAIL("Cannot set realtime priority"); } - } -#else + } +#else Y_UNUSED(RealtimePriority); -#endif +#endif } ui32 TBasicExecutorPool::GetThreadCount() const { diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 023190f7fe..65ceed2669 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -62,7 +62,7 @@ namespace NActors { TAtomic ThreadUtilization; TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; - + TAtomic ThreadCount; TMutex ChangeThreadsLock; @@ -81,7 +81,7 @@ namespace NActors { ui32 maxActivityType = 1); explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg); ~TBasicExecutorPool(); - + ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index dac6245635..e5968609e7 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -14,7 +14,7 @@ #include <util/system/datetime.h> #include <util/system/hp_timer.h> - + #include <algorithm> namespace NActors { @@ -1315,7 +1315,7 @@ namespace NActors { if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) { break; // token acquired (or stop) } - } + } wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed()); return result; diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index a090ba2466..01be95b778 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -63,7 +63,7 @@ namespace NActors { // Sets executor for specified pool void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable); - + // Add activation of newly scheduled mailbox and wake cpu to execute it if required void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); @@ -72,7 +72,7 @@ namespace NActors { // Try to wake idle cpu waiting for tokens on specified pool void TryWake(TPoolId pool); - + // Get activation from pool; requires pool's token void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 446b651efd..ac97689f31 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -303,7 +303,7 @@ namespace NActors { ExecutorPool->SetRealTimeMode(); TAffinityGuard affinity(ExecutorPool->Affinity()); - + NHPTimer::STime hpnow = GetCycleCountFast(); NHPTimer::STime hpprev = hpnow; ui64 execCount = 0; diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 9d3c573f0d..66b97bd351 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -45,7 +45,7 @@ namespace NActors { void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId); void DropUnregistered(); const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } - + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 5f63b5af58..bfac7d30e4 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -195,7 +195,7 @@ namespace NActors { , Metrics(std::make_unique<TLoggerMetrics>(metrics)) { } - + TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, std::shared_ptr<TLogBackend> logBackend, std::shared_ptr<NMonitoring::TMetricRegistry> metrics) @@ -260,8 +260,8 @@ namespace NActors { break; default: break; - } - + } + } void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c11a7cf3c1..514ff51c14 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -42,7 +42,7 @@ actorCtxOrSystem, priority, component, __VA_ARGS__); \ } \ } while (0) /**/ - + #define LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, stream) \ LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, "%s", [&]() { \ TStringBuilder logStringBuilder; \ @@ -304,7 +304,7 @@ namespace NActors { ///////////////////////////////////////////////////////////////////// // Logging adaptors for memory log and logging into filesystem ///////////////////////////////////////////////////////////////////// - + namespace NDetail { inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) { va_list params; @@ -318,7 +318,7 @@ namespace NActors { } } // namespace NDetail - template <typename TCtx> + template <typename TCtx> inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str) { const NLog::TSettings *mSettings = ctx.LoggerSettings(); @@ -327,14 +327,14 @@ namespace NActors { } template <typename TCtx, typename... TArgs> - inline void MemLogAdapter( + inline void MemLogAdapter( TCtx& actorCtxOrSystem, NLog::EPriority mPriority, NLog::EComponent mComponent, const char* format, TArgs&&... params) { TString Formatted; - - + + if constexpr (sizeof... (params) > 0) { NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...); } else { @@ -343,9 +343,9 @@ namespace NActors { MemLogWrite(Formatted.data(), Formatted.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted)); - } - - template <typename TCtx> + } + + template <typename TCtx> Y_WRAPPER inline void MemLogAdapter( TCtx& actorCtxOrSystem, NLog::EPriority mPriority, @@ -355,7 +355,7 @@ namespace NActors { MemLogWrite(str.data(), str.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, TString(str)); } - + template <typename TCtx> Y_WRAPPER inline void MemLogAdapter( TCtx& actorCtxOrSystem, @@ -365,5 +365,5 @@ namespace NActors { MemLogWrite(str.data(), str.size(), true); DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(str)); - } + } } diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index d84b4f9e46..ac598eff86 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -214,49 +214,49 @@ namespace NActors { return true; case TMailboxType::HTSwap: { THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; case TMailboxType::ReadAsFilled: { if (lineHint > TReadAsFilledMailbox::MaxMailboxesInLine()) return false; - + TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; case TMailboxType::TinyReadAsFilled: { if (lineHint > TTinyReadAsFilledMailbox::MaxMailboxesInLine()) return false; - + TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x); -#if (!defined(_tsan_enabled_)) +#if (!defined(_tsan_enabled_)) Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType); -#endif +#endif mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); executorPool->ScheduleActivation(hint); } - } + } return true; default: Y_FAIL("unknown mailbox type"); - } + } } return false; diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 0bd9c4d314..8a2c0d0608 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -10,7 +10,7 @@ #include <library/cpp/threading/queue/mpsc_read_as_filled.h> #include <util/generic/hash.h> #include <util/system/hp_timer.h> -#include <util/generic/ptr.h> +#include <util/generic/ptr.h> // TODO: clean all broken arcadia atomic stuff and replace with intrinsics namespace NActors { @@ -389,52 +389,52 @@ namespace NActors { constexpr static ui32 AlignedSize() { return ((sizeof(TRevolvingMailbox) + 63) / 64) * 64; } - + std::pair<ui32, ui32> CountRevolvingMailboxEvents(ui64 localActorId, ui32 maxTraverse); bool CleanupEvents(); }; - + static_assert(sizeof(TRevolvingMailbox) == 128, "expect sizeof(TRevolvingMailbox) == 128"); - + struct THTSwapMailbox: public TMailboxHeader { using TQueueType = NThreading::THTSwapQueue<IEventHandle*>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[16]; - + THTSwapMailbox() : TMailboxHeader(TMailboxType::HTSwap) , ScheduleMoment(0) { } - + ~THTSwapMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static THTSwapMailbox* Get(ui32 hint, void* line) { return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * 64); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::HTSwap; - + constexpr static ui32 AlignedSize() { return ((sizeof(THTSwapMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -442,50 +442,50 @@ namespace NActors { return done; } }; - + static_assert(sizeof(THTSwapMailbox) == 64, "expect sizeof(THTSwapMailbox) == 64"); - + struct TReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue<IEventHandle>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[8]; - + TReadAsFilledMailbox() : TMailboxHeader(TMailboxType::ReadAsFilled) , ScheduleMoment(0) { } - + ~TReadAsFilledMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static TReadAsFilledMailbox* Get(ui32 hint, void* line) { return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::ReadAsFilled; - + constexpr static ui32 AlignedSize() { return ((sizeof(TReadAsFilledMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -493,52 +493,52 @@ namespace NActors { return done; } }; - + static_assert(sizeof(TReadAsFilledMailbox) == 192, "expect sizeof(TReadAsFilledMailbox) == 192"); - + struct TTinyReadAsFilledMailbox: public TMailboxHeader { using TQueueType = NThreading::TReadAsFilledQueue< IEventHandle, NThreading::TRaFQueueBunchSize<4>>; - + TQueueType Queue; NHPTimer::STime ScheduleMoment; char Padding_[8]; - + TTinyReadAsFilledMailbox() : TMailboxHeader(TMailboxType::TinyReadAsFilled) , ScheduleMoment(0) { } - + ~TTinyReadAsFilledMailbox() { CleanupEvents(); } - + IEventHandle* Pop() { return Queue.Pop(); } - + IEventHandle* Head() { return Queue.Peek(); } - + static TTinyReadAsFilledMailbox* Get(ui32 hint, void* line) { return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192); } - + constexpr static ui64 MaxMailboxesInLine() { return (LineSize - 64) / AlignedSize(); } - + static const TMailboxType::EType MailboxType = TMailboxType::TinyReadAsFilled; - + constexpr static ui32 AlignedSize() { return ((sizeof(TTinyReadAsFilledMailbox) + 63) / 64) * 64; } - + bool CleanupEvents() { const bool done = (Queue.Peek() == nullptr); while (IEventHandle* ev = Queue.Pop()) @@ -546,8 +546,8 @@ namespace NActors { return done; } }; - + static_assert(sizeof(TTinyReadAsFilledMailbox) == 192, "expect sizeof(TTinyReadAsFilledMailbox) == 192"); - }; + }; } diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h index c450f2338e..3ebf6a0bed 100644 --- a/library/cpp/actors/core/mon.h +++ b/library/cpp/actors/core/mon.h @@ -123,7 +123,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfo(bufs->GetString()); } @@ -160,7 +160,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteHttpInfoRes(bufs->GetString()); } }; @@ -192,7 +192,7 @@ namespace NActors { return true; } - static IEventBase* Load(TEventSerializedData* bufs) { + static IEventBase* Load(TEventSerializedData* bufs) { return new TEvRemoteJsonInfoRes(bufs->GetString()); } }; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index d55552af0c..f1d66664b6 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -13,17 +13,17 @@ namespace NActors { inline void Add(ui64 val, ui64 inc = 1) { size_t ind = 0; -#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 +#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 asm volatile("" :: : "memory"); -#endif +#endif if (val > 1) { ind = GetValueBitCount(val - 1); } -#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 +#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7 asm volatile("" :: : "memory"); -#endif +#endif RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc); } diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index 880a9d00db..22155dbeec 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -32,8 +32,8 @@ SRCS( ask.h balancer.h balancer.cpp - buffer.cpp - buffer.h + buffer.cpp + buffer.h callstack.cpp callstack.h config.h @@ -45,7 +45,7 @@ SRCS( event.h event_load.h event_local.h - event_pb.cpp + event_pb.cpp event_pb.h events.h events_undelivered.cpp diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp index 649339ddb2..580956c92e 100644 --- a/library/cpp/actors/dnscachelib/dnscache.cpp +++ b/library/cpp/actors/dnscachelib/dnscache.cpp @@ -155,19 +155,19 @@ void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses, } bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept { - time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6; - time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6; - - if (TTimeKeeper::GetTime() - resolved < ctx->EntryLifetime) - return false; - - if (TTimeKeeper::GetTime() - notfound < ctx->NegativeLifetime) - return false; - - return true; -} - -const TDnsCache::THost& + time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6; + time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6; + + if (TTimeKeeper::GetTime() - resolved < ctx->EntryLifetime) + return false; + + if (TTimeKeeper::GetTime() - notfound < ctx->NegativeLifetime) + return false; + + return true; +} + +const TDnsCache::THost& TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { if (!ValidateHName(hostname)) { LWPROBE(ResolveNullHost, hostname, family); @@ -182,7 +182,7 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { TGuard<TMutex> lock(CacheMtx); p = HostCache.find(hostname); if (p != HostCache.end()) { - if (!p->second.IsStale(family, this)) { + if (!p->second.IsStale(family, this)) { /* Recently resolved, just return cached value */ ACacheHits += 1; THost& host = p->second; @@ -199,9 +199,9 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { ACacheMisses += 1; } - if (cacheOnly) - return NullHost; - + if (cacheOnly) + return NullHost; + TAtomic& inprogress = (family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6); { @@ -219,7 +219,7 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { ctx->Hostname = hostname; ctx->Family = family; - AtomicSet(inprogress, 1); + AtomicSet(inprogress, 1); ares_gethostbyname(chan, hostname.c_str(), family, &TDnsCache::GHBNCallback, ctx); } @@ -269,7 +269,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) ctx->Owner = this; ctx->Addr = addr; - AtomicSet(p->second.InProgress, 1); + AtomicSet(p->second.InProgress, 1); ares_gethostbyaddr(chan, &addr, family == AF_INET ? sizeof(in_addr) : sizeof(in6_addr), family, &TDnsCache::GHBACallback, ctx); @@ -284,7 +284,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) void TDnsCache::WaitTask(TAtomic& flag) { const TInstant start = TInstant(TTimeKeeper::GetTimeval()); - while (AtomicGet(flag)) { + while (AtomicGet(flag)) { ares_channel chan = static_cast<ares_channel>(Channel); struct pollfd pfd[ARES_GETSOCK_MAXNUM]; @@ -380,7 +380,7 @@ void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) { */ p->second.ResolvedV4 = TTimeKeeper::GetTime(); p->second.ResolvedV4 = 0; - AtomicSet(p->second.InProgressV4, 0); + AtomicSet(p->second.InProgressV4, 0); } else if (info->h_addrtype == AF_INET6) { p->second.AddrsV6.clear(); for (int i = 0; info->h_addr_list[i] != nullptr; i++) { @@ -395,7 +395,7 @@ void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) { notfound = TTimeKeeper::GetTime(); resolved = 0; } - AtomicSet(inprogress, 0); + AtomicSet(inprogress, 0); } void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) { @@ -413,7 +413,7 @@ void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) { p->second.NotFound = TTimeKeeper::GetTime(); p->second.Resolved = 0; } - AtomicSet(p->second.InProgress, 0); + AtomicSet(p->second.InProgress, 0); } TString TDnsCache::THost::AddrsV4ToString() const { @@ -441,5 +441,5 @@ TString TDnsCache::THost::AddrsV6ToString() const { } return ss.Str(); } - -TDnsCache::TAresLibInit TDnsCache::InitAresLib; + +TDnsCache::TAresLibInit TDnsCache::InitAresLib; diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h index 3313a251a1..586957b9a0 100644 --- a/library/cpp/actors/dnscachelib/dnscache.h +++ b/library/cpp/actors/dnscachelib/dnscache.h @@ -1,6 +1,6 @@ #pragma once -#include <contrib/libs/c-ares/ares.h> +#include <contrib/libs/c-ares/ares.h> #include <util/generic/map.h> #include <util/generic/vector.h> #include <util/network/address.h> @@ -28,9 +28,9 @@ public: /* use with AF_INET, AF_INET6 or AF_UNSPEC */ NAddr::IRemoteAddrPtr GetAddr(const TString& host, - int family, - TIpPort port = 0, - bool cacheOnly = false); + int family, + TIpPort port = 0, + bool cacheOnly = false); void GetAllAddresses(const TString& host, TVector<NAddr::IRemoteAddrPtr>&); @@ -68,8 +68,8 @@ private: TString AddrsV4ToString() const; TString AddrsV6ToString() const; - - bool IsStale(int family, const TDnsCache* ctx) const noexcept; + + bool IsStale(int family, const TDnsCache* ctx) const noexcept; }; typedef TMap<TString, THost> THostCache; @@ -99,9 +99,9 @@ private: typedef TMap<in6_addr, TAddr, TAddrCmp> TAddrCache; const THost& Resolve(const TString&, int family, bool cacheOnly = false); - + const TAddr& ResolveAddr(const in6_addr&, int family); - + void WaitTask(TAtomic&); static void GHBNCallback(void* arg, int status, int timeouts, @@ -128,21 +128,21 @@ private: TMutex AresMtx; void* Channel; - - struct TAresLibInit { - TAresLibInit() { + + struct TAresLibInit { + TAresLibInit() { #ifdef _win_ - const auto res = ares_library_init(ARES_LIB_INIT_ALL); - Y_VERIFY(res == 0); + const auto res = ares_library_init(ARES_LIB_INIT_ALL); + Y_VERIFY(res == 0); #endif - } - - ~TAresLibInit() { + } + + ~TAresLibInit() { #ifdef _win_ - ares_library_cleanup(); + ares_library_cleanup(); #endif - } - }; - - static TAresLibInit InitAresLib; + } + }; + + static TAresLibInit InitAresLib; }; diff --git a/library/cpp/actors/memory_log/memlog.cpp b/library/cpp/actors/memory_log/memlog.cpp index 8e6b46727d..f20162db70 100644 --- a/library/cpp/actors/memory_log/memlog.cpp +++ b/library/cpp/actors/memory_log/memlog.cpp @@ -1,28 +1,28 @@ -#include "memlog.h" - +#include "memlog.h" + #include <library/cpp/actors/util/datetime.h> -#include <util/system/info.h> -#include <util/system/atomic.h> -#include <util/system/align.h> - -#include <contrib/libs/linuxvdso/interface.h> - -#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_) -#define HAVE_VDSO_GETCPU 1 -#include <contrib/libs/linuxvdso/interface.h> -static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused); -#endif - -#if defined(_unix_) +#include <util/system/info.h> +#include <util/system/atomic.h> +#include <util/system/align.h> + +#include <contrib/libs/linuxvdso/interface.h> + +#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_) +#define HAVE_VDSO_GETCPU 1 +#include <contrib/libs/linuxvdso/interface.h> +static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused); +#endif + +#if defined(_unix_) #include <sched.h> -#elif defined(_win_) +#elif defined(_win_) #include <WinBase.h> -#else +#else #error NO IMPLEMENTATION FOR THE PLATFORM -#endif - -const char TMemoryLog::DEFAULT_LAST_MARK[16] = { +#endif + +const char TMemoryLog::DEFAULT_LAST_MARK[16] = { 'c', 'b', '7', @@ -39,9 +39,9 @@ const char TMemoryLog::DEFAULT_LAST_MARK[16] = { '4', '5', '\n', -}; - -const char TMemoryLog::CLEAR_MARK[16] = { +}; + +const char TMemoryLog::CLEAR_MARK[16] = { ' ', ' ', ' ', @@ -58,146 +58,146 @@ const char TMemoryLog::CLEAR_MARK[16] = { ' ', ' ', '\n', -}; - -unsigned TMemoryLog::GetSelfCpu() noexcept { -#if defined(_unix_) +}; + +unsigned TMemoryLog::GetSelfCpu() noexcept { +#if defined(_unix_) #if HAVE_VDSO_GETCPU - unsigned cpu; - if (Y_LIKELY(FastGetCpu != nullptr)) { - auto result = FastGetCpu(&cpu, nullptr, nullptr); - Y_VERIFY(result == 0); + unsigned cpu; + if (Y_LIKELY(FastGetCpu != nullptr)) { + auto result = FastGetCpu(&cpu, nullptr, nullptr); + Y_VERIFY(result == 0); return cpu; - } else { - return 0; - } - + } else { + return 0; + } + #elif defined(_x86_64_) || defined(_i386_) - + #define CPUID(func, eax, ebx, ecx, edx) \ __asm__ __volatile__( \ "cpuid" \ : "=a"(eax), "=b"(ebx), "=c"(ecx), "=d"(edx) \ : "a"(func)); - - int a = 0, b = 0, c = 0, d = 0; - CPUID(0x1, a, b, c, d); - int acpiID = (b >> 24); - return acpiID; - + + int a = 0, b = 0, c = 0, d = 0; + CPUID(0x1, a, b, c, d); + int acpiID = (b >> 24); + return acpiID; + #elif defined(__CNUC__) - return sched_getcpu(); + return sched_getcpu(); #else - return 0; + return 0; #endif - -#elif defined(_win_) - return GetCurrentProcessorNumber(); -#else - return 0; -#endif -} - -TMemoryLog* TMemoryLog::MemLogBuffer = nullptr; + +#elif defined(_win_) + return GetCurrentProcessorNumber(); +#else + return 0; +#endif +} + +TMemoryLog* TMemoryLog::MemLogBuffer = nullptr; Y_POD_THREAD(TThread::TId) TMemoryLog::LogThreadId; -char* TMemoryLog::LastMarkIsHere = nullptr; - -std::atomic<bool> TMemoryLog::PrintLastMark(true); - +char* TMemoryLog::LastMarkIsHere = nullptr; + +std::atomic<bool> TMemoryLog::PrintLastMark(true); + TMemoryLog::TMemoryLog(size_t totalSize, size_t grainSize) : GrainSize(grainSize) , FreeGrains(DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE * 2) , Buf(totalSize) -{ - Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0); - NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE; - - for (size_t i = 0; i < NumberOfGrains; ++i) { - new (GetGrain(i)) TGrain; - } - - NumberOfCpus = NSystemInfo::NumberOfCpus(); - Y_VERIFY(NumberOfGrains > NumberOfCpus); - ActiveGrains.Reset(new TGrain*[NumberOfCpus]); - for (size_t i = 0; i < NumberOfCpus; ++i) { - ActiveGrains[i] = GetGrain(i); - } - - for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) { - FreeGrains.StubbornPush(GetGrain(i)); - } - -#if HAVE_VDSO_GETCPU - auto vdsoFunc = (decltype(FastGetCpu)) - NVdso::Function("__vdso_getcpu", "LINUX_2.6"); - AtomicSet(FastGetCpu, vdsoFunc); -#endif -} - -void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept { - // alignment required by NoCacheMemcpy - amount = AlignUp<size_t>(amount, MemcpyAlignment); - - for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) { - auto myCpu = GetSelfCpu(); - - TGrain* grain = AtomicGet(ActiveGrains[myCpu]); - - if (grain != nullptr) { - auto mine = AtomicGetAndAdd(grain->WritePointer, amount); - if (mine + amount <= GrainSize - sizeof(TGrain)) { - return &grain->Data[mine]; - } - - if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) { - continue; - } - - FreeGrains.StubbornPush(grain); - } - - grain = (TGrain*)FreeGrains.Pop(); - - if (grain == nullptr) { - return nullptr; - } - - grain->WritePointer = 0; - - if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) { - FreeGrains.StubbornPush(grain); - continue; - } - } - - return nullptr; -} - -void ClearAlignedTail(char* tail) noexcept { - auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment); - if (aligned > tail) { - memset(tail, 0, aligned - tail); - } -} - -#if defined(_x86_64_) || defined(_i386_) -#include <xmmintrin.h> -// the main motivation is not poluting CPU cache -NO_SANITIZE_THREAD -void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept { - while (size >= sizeof(__m128) * 2) { - __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128))); - __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128))); - _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a); - _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b); - - size -= sizeof(__m128) * 2; - src += sizeof(__m128) * 2; - dst += sizeof(__m128) * 2; - } - memcpy(dst, src, size); -} +{ + Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0); + NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE; + + for (size_t i = 0; i < NumberOfGrains; ++i) { + new (GetGrain(i)) TGrain; + } + + NumberOfCpus = NSystemInfo::NumberOfCpus(); + Y_VERIFY(NumberOfGrains > NumberOfCpus); + ActiveGrains.Reset(new TGrain*[NumberOfCpus]); + for (size_t i = 0; i < NumberOfCpus; ++i) { + ActiveGrains[i] = GetGrain(i); + } + + for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) { + FreeGrains.StubbornPush(GetGrain(i)); + } + +#if HAVE_VDSO_GETCPU + auto vdsoFunc = (decltype(FastGetCpu)) + NVdso::Function("__vdso_getcpu", "LINUX_2.6"); + AtomicSet(FastGetCpu, vdsoFunc); +#endif +} + +void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept { + // alignment required by NoCacheMemcpy + amount = AlignUp<size_t>(amount, MemcpyAlignment); + + for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) { + auto myCpu = GetSelfCpu(); + + TGrain* grain = AtomicGet(ActiveGrains[myCpu]); + + if (grain != nullptr) { + auto mine = AtomicGetAndAdd(grain->WritePointer, amount); + if (mine + amount <= GrainSize - sizeof(TGrain)) { + return &grain->Data[mine]; + } + + if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) { + continue; + } + + FreeGrains.StubbornPush(grain); + } + + grain = (TGrain*)FreeGrains.Pop(); + + if (grain == nullptr) { + return nullptr; + } + + grain->WritePointer = 0; + + if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) { + FreeGrains.StubbornPush(grain); + continue; + } + } + + return nullptr; +} + +void ClearAlignedTail(char* tail) noexcept { + auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment); + if (aligned > tail) { + memset(tail, 0, aligned - tail); + } +} + +#if defined(_x86_64_) || defined(_i386_) +#include <xmmintrin.h> +// the main motivation is not poluting CPU cache +NO_SANITIZE_THREAD +void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept { + while (size >= sizeof(__m128) * 2) { + __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128))); + __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128))); + _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a); + _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b); + + size -= sizeof(__m128) * 2; + src += sizeof(__m128) * 2; + dst += sizeof(__m128) * 2; + } + memcpy(dst, src, size); +} NO_SANITIZE_THREAD void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept { @@ -224,144 +224,144 @@ void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept { } } -#endif - -NO_SANITIZE_THREAD -char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept { - bool lastMark = - isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire); - size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; - - char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); - if (buffer == nullptr) { - return nullptr; - } - -#if defined(_x86_64_) || defined(_i386_) - if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) { - NoCacheMemcpy(buffer, begin, msgSize); +#endif + +NO_SANITIZE_THREAD +char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept { + bool lastMark = + isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire); + size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; + + char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); + if (buffer == nullptr) { + return nullptr; + } + +#if defined(_x86_64_) || defined(_i386_) + if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) { + NoCacheMemcpy(buffer, begin, msgSize); } else { NoWCacheMemcpy(buffer, begin, msgSize); } #else memcpy(buffer, begin, msgSize); #endif - - if (lastMark) { - TMemoryLog::ChangeLastMark(buffer + msgSize); - } - - ClearAlignedTail(buffer + amount); - return buffer; -} - -NO_SANITIZE_THREAD -bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept { - bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire); - size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; - - // Let's construct prolog with timestamp and thread id - auto threadId = TMemoryLog::GetTheadId(); - - // alignment required by NoCacheMemcpy - // check for format for snprintf - constexpr size_t prologSize = 48; + + if (lastMark) { + TMemoryLog::ChangeLastMark(buffer + msgSize); + } + + ClearAlignedTail(buffer + amount); + return buffer; +} + +NO_SANITIZE_THREAD +bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept { + bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire); + size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; + + // Let's construct prolog with timestamp and thread id + auto threadId = TMemoryLog::GetTheadId(); + + // alignment required by NoCacheMemcpy + // check for format for snprintf + constexpr size_t prologSize = 48; alignas(TMemoryLog::MemcpyAlignment) char prolog[prologSize + 1]; Y_VERIFY(AlignDown(&prolog, TMemoryLog::MemcpyAlignment) == &prolog); - - int snprintfResult = snprintf(prolog, prologSize + 1, + + int snprintfResult = snprintf(prolog, prologSize + 1, "TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId); - - if (snprintfResult < 0) { - return false; - } - Y_VERIFY(snprintfResult == prologSize); - - amount += prologSize; - if (addLF) { - ++amount; // add 1 byte for \n at the end of the message - } - - char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); - if (buffer == nullptr) { - return false; - } - -#if defined(_x86_64_) || defined(_i386_) + + if (snprintfResult < 0) { + return false; + } + Y_VERIFY(snprintfResult == prologSize); + + amount += prologSize; + if (addLF) { + ++amount; // add 1 byte for \n at the end of the message + } + + char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); + if (buffer == nullptr) { + return false; + } + +#if defined(_x86_64_) || defined(_i386_) // warning: copy prolog first to avoid corruption of the message // by prolog tail NoCacheMemcpy(buffer, prolog, prologSize); if (AlignDown(begin + prologSize, TMemoryLog::MemcpyAlignment) == begin + prologSize) { NoCacheMemcpy(buffer + prologSize, begin, msgSize); - } else { + } else { NoWCacheMemcpy(buffer + prologSize, begin, msgSize); } #else memcpy(buffer, prolog, prologSize); memcpy(buffer + prologSize, begin, msgSize); #endif - - if (addLF) { - buffer[prologSize + msgSize] = '\n'; - } - - if (lastMark) { - TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF); - } - - ClearAlignedTail(buffer + amount); - return true; -} - -NO_SANITIZE_THREAD -void TMemoryLog::ChangeLastMark(char* buffer) noexcept { - memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE); - auto oldMark = AtomicSwap(&LastMarkIsHere, buffer); - if (Y_LIKELY(oldMark != nullptr)) { - memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE); - } - if (AtomicGet(LastMarkIsHere) != buffer) { - memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE); - AtomicBarrier(); - } -} - -bool MemLogVPrintF(const char* format, va_list params) noexcept { - auto logger = TMemoryLog::GetMemoryLogger(); - if (logger == nullptr) { - return false; - } - - auto threadId = TMemoryLog::GetTheadId(); - - // alignment required by NoCacheMemcpy + + if (addLF) { + buffer[prologSize + msgSize] = '\n'; + } + + if (lastMark) { + TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF); + } + + ClearAlignedTail(buffer + amount); + return true; +} + +NO_SANITIZE_THREAD +void TMemoryLog::ChangeLastMark(char* buffer) noexcept { + memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE); + auto oldMark = AtomicSwap(&LastMarkIsHere, buffer); + if (Y_LIKELY(oldMark != nullptr)) { + memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE); + } + if (AtomicGet(LastMarkIsHere) != buffer) { + memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE); + AtomicBarrier(); + } +} + +bool MemLogVPrintF(const char* format, va_list params) noexcept { + auto logger = TMemoryLog::GetMemoryLogger(); + if (logger == nullptr) { + return false; + } + + auto threadId = TMemoryLog::GetTheadId(); + + // alignment required by NoCacheMemcpy alignas(TMemoryLog::MemcpyAlignment) char buf[TMemoryLog::MAX_MESSAGE_SIZE]; Y_VERIFY(AlignDown(&buf, TMemoryLog::MemcpyAlignment) == &buf); - + int prologSize = snprintf(buf, TMemoryLog::MAX_MESSAGE_SIZE - 2, "TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId); - - if (Y_UNLIKELY(prologSize < 0)) { - return false; - } - Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE); - - int add = vsnprintf( + + if (Y_UNLIKELY(prologSize < 0)) { + return false; + } + Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE); + + int add = vsnprintf( &buf[prologSize], - TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2, - format, params); - - if (Y_UNLIKELY(add < 0)) { - return false; - } - Y_VERIFY(add >= 0); - auto totalSize = prologSize + add; - + TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2, + format, params); + + if (Y_UNLIKELY(add < 0)) { + return false; + } + Y_VERIFY(add >= 0); + auto totalSize = prologSize + add; + buf[totalSize++] = '\n'; - Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE); - + Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE); + return BareMemLogWrite(buf, totalSize) != nullptr; -} +} diff --git a/library/cpp/actors/memory_log/memlog.h b/library/cpp/actors/memory_log/memlog.h index 2aa27272a6..fe66efc4fb 100644 --- a/library/cpp/actors/memory_log/memlog.h +++ b/library/cpp/actors/memory_log/memlog.h @@ -1,211 +1,211 @@ -#pragma once - +#pragma once + #include <library/cpp/threading/queue/mpmc_unordered_ring.h> #include <util/generic/string.h> -#include <util/string/printf.h> -#include <util/system/datetime.h> -#include <util/system/thread.h> -#include <util/system/types.h> -#include <util/system/atomic.h> -#include <util/system/align.h> -#include <util/system/tls.h> - -#include <atomic> -#include <cstdio> - -#ifdef _win_ -#include <util/system/winint.h> -#endif - -#ifndef NO_SANITIZE_THREAD +#include <util/string/printf.h> +#include <util/system/datetime.h> +#include <util/system/thread.h> +#include <util/system/types.h> +#include <util/system/atomic.h> +#include <util/system/align.h> +#include <util/system/tls.h> + +#include <atomic> +#include <cstdio> + +#ifdef _win_ +#include <util/system/winint.h> +#endif + +#ifndef NO_SANITIZE_THREAD #define NO_SANITIZE_THREAD #if defined(__has_feature) #if __has_feature(thread_sanitizer) #undef NO_SANITIZE_THREAD #define NO_SANITIZE_THREAD __attribute__((no_sanitize_thread)) +#endif #endif #endif -#endif - -class TMemoryLog { -public: - static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024; - static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64; - static constexpr size_t MAX_MESSAGE_SIZE = 1024; - static constexpr ui16 MAX_GET_BUFFER_TRIES = 4; - static constexpr ui16 MemcpyAlignment = 16; - - // search for cb7B68a8A561645 - static const char DEFAULT_LAST_MARK[16]; - static const char CLEAR_MARK[16]; - - static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK); - - inline static TMemoryLog* GetMemoryLogger() noexcept { - return AtomicGet(MemLogBuffer); - } - + +class TMemoryLog { +public: + static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024; + static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64; + static constexpr size_t MAX_MESSAGE_SIZE = 1024; + static constexpr ui16 MAX_GET_BUFFER_TRIES = 4; + static constexpr ui16 MemcpyAlignment = 16; + + // search for cb7B68a8A561645 + static const char DEFAULT_LAST_MARK[16]; + static const char CLEAR_MARK[16]; + + static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK); + + inline static TMemoryLog* GetMemoryLogger() noexcept { + return AtomicGet(MemLogBuffer); + } + void* GetWriteBuffer(size_t amount) noexcept; - - inline static void* GetWriteBufferStatic(size_t amount) noexcept { - auto logger = GetMemoryLogger(); - if (logger == nullptr) { - return nullptr; - } - return logger->GetWriteBuffer(amount); - } - - size_t GetGlobalBufferSize() const noexcept { - return Buf.GetSize(); - } - - inline static void CreateMemoryLogBuffer( + + inline static void* GetWriteBufferStatic(size_t amount) noexcept { + auto logger = GetMemoryLogger(); + if (logger == nullptr) { + return nullptr; + } + return logger->GetWriteBuffer(amount); + } + + size_t GetGlobalBufferSize() const noexcept { + return Buf.GetSize(); + } + + inline static void CreateMemoryLogBuffer( size_t totalSize = DEFAULT_TOTAL_SIZE, size_t grainSize = DEFAULT_GRAIN_SIZE) Y_COLD { - if (AtomicGet(MemLogBuffer) != nullptr) { - return; - } - - AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize)); - } - - static std::atomic<bool> PrintLastMark; - - // buffer must be at least 16 bytes + if (AtomicGet(MemLogBuffer) != nullptr) { + return; + } + + AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize)); + } + + static std::atomic<bool> PrintLastMark; + + // buffer must be at least 16 bytes static void ChangeLastMark(char* buffer) noexcept; - - inline static TThread::TId GetTheadId() noexcept { - if (LogThreadId == 0) { - LogThreadId = TThread::CurrentThreadId(); - } - return LogThreadId; - } - -private: + + inline static TThread::TId GetTheadId() noexcept { + if (LogThreadId == 0) { + LogThreadId = TThread::CurrentThreadId(); + } + return LogThreadId; + } + +private: TMemoryLog(size_t totalSize, size_t grainSize) Y_COLD; - - struct TGrain { - TAtomic WritePointer = 0; - char Padding[MemcpyAlignment - sizeof(TAtomic)]; - char Data[]; - }; - - size_t NumberOfCpus; - size_t GrainSize; - size_t NumberOfGrains; - TArrayPtr<TGrain*> ActiveGrains; - NThreading::TMPMCUnorderedRing FreeGrains; - - TGrain* GetGrain(size_t grainIndex) const noexcept { - return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex); - } - - class TMMapArea { - public: - TMMapArea(size_t amount) Y_COLD { - MMap(amount); - } - - TMMapArea(const TMMapArea&) = delete; - TMMapArea& operator=(const TMMapArea& copy) = delete; - - TMMapArea(TMMapArea&& move) Y_COLD { - BufPtr = move.BufPtr; - Size = move.Size; - - move.BufPtr = nullptr; - move.Size = 0; - } - - TMMapArea& operator=(TMMapArea&& move) Y_COLD { - BufPtr = move.BufPtr; - Size = move.Size; - - move.BufPtr = nullptr; - move.Size = 0; - return *this; - } - - void Reset(size_t amount) Y_COLD { - MUnmap(); - MMap(amount); - } - - ~TMMapArea() noexcept Y_COLD { - MUnmap(); - } - - size_t GetSize() const noexcept { - return Size; - } - - void* GetPtr() const noexcept { - return BufPtr; - } - - private: - void* BufPtr; - size_t Size; -#ifdef _win_ - HANDLE Mapping; -#endif - - void MMap(size_t amount); - void MUnmap(); - }; - - TMMapArea Buf; - - void* GetGlobalBuffer() const noexcept { - return Buf.GetPtr(); - } - - static unsigned GetSelfCpu() noexcept; - - static TMemoryLog* MemLogBuffer; - static Y_POD_THREAD(TThread::TId) LogThreadId; - static char* LastMarkIsHere; -}; - -// it's no use of sanitizing this function -NO_SANITIZE_THREAD + + struct TGrain { + TAtomic WritePointer = 0; + char Padding[MemcpyAlignment - sizeof(TAtomic)]; + char Data[]; + }; + + size_t NumberOfCpus; + size_t GrainSize; + size_t NumberOfGrains; + TArrayPtr<TGrain*> ActiveGrains; + NThreading::TMPMCUnorderedRing FreeGrains; + + TGrain* GetGrain(size_t grainIndex) const noexcept { + return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex); + } + + class TMMapArea { + public: + TMMapArea(size_t amount) Y_COLD { + MMap(amount); + } + + TMMapArea(const TMMapArea&) = delete; + TMMapArea& operator=(const TMMapArea& copy) = delete; + + TMMapArea(TMMapArea&& move) Y_COLD { + BufPtr = move.BufPtr; + Size = move.Size; + + move.BufPtr = nullptr; + move.Size = 0; + } + + TMMapArea& operator=(TMMapArea&& move) Y_COLD { + BufPtr = move.BufPtr; + Size = move.Size; + + move.BufPtr = nullptr; + move.Size = 0; + return *this; + } + + void Reset(size_t amount) Y_COLD { + MUnmap(); + MMap(amount); + } + + ~TMMapArea() noexcept Y_COLD { + MUnmap(); + } + + size_t GetSize() const noexcept { + return Size; + } + + void* GetPtr() const noexcept { + return BufPtr; + } + + private: + void* BufPtr; + size_t Size; +#ifdef _win_ + HANDLE Mapping; +#endif + + void MMap(size_t amount); + void MUnmap(); + }; + + TMMapArea Buf; + + void* GetGlobalBuffer() const noexcept { + return Buf.GetPtr(); + } + + static unsigned GetSelfCpu() noexcept; + + static TMemoryLog* MemLogBuffer; + static Y_POD_THREAD(TThread::TId) LogThreadId; + static char* LastMarkIsHere; +}; + +// it's no use of sanitizing this function +NO_SANITIZE_THREAD char* BareMemLogWrite( - const char* begin, size_t msgSize, bool isLast = true) noexcept; - -// it's no use of sanitizing this function -NO_SANITIZE_THREAD + const char* begin, size_t msgSize, bool isLast = true) noexcept; + +// it's no use of sanitizing this function +NO_SANITIZE_THREAD bool MemLogWrite( - const char* begin, size_t msgSize, bool addLF = false) noexcept; - -Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept { - if (end <= begin) { - return false; - } - - size_t msgSize = end - begin; - return MemLogWrite(begin, msgSize); -} - -template <typename TObj> -bool MemLogWriteStruct(const TObj* obj) noexcept { - auto begin = (const char*)(const void*)obj; - return MemLogWrite(begin, begin + sizeof(TObj)); -} - + const char* begin, size_t msgSize, bool addLF = false) noexcept; + +Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept { + if (end <= begin) { + return false; + } + + size_t msgSize = end - begin; + return MemLogWrite(begin, msgSize); +} + +template <typename TObj> +bool MemLogWriteStruct(const TObj* obj) noexcept { + auto begin = (const char*)(const void*)obj; + return MemLogWrite(begin, begin + sizeof(TObj)); +} + Y_PRINTF_FORMAT(1, 0) -bool MemLogVPrintF(const char* format, va_list params) noexcept; - +bool MemLogVPrintF(const char* format, va_list params) noexcept; + Y_PRINTF_FORMAT(1, 2) Y_WRAPPER -inline bool MemLogPrintF(const char* format, ...) noexcept { - va_list params; - va_start(params, format); - auto result = MemLogVPrintF(format, params); - va_end(params); - return result; -} - -Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept { - return MemLogWrite(str, strlen(str)); -} +inline bool MemLogPrintF(const char* format, ...) noexcept { + va_list params; + va_start(params, format); + auto result = MemLogVPrintF(format, params); + va_end(params); + return result; +} + +Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept { + return MemLogWrite(str, strlen(str)); +} diff --git a/library/cpp/actors/memory_log/mmap.cpp b/library/cpp/actors/memory_log/mmap.cpp index 201998d343..b72feb1112 100644 --- a/library/cpp/actors/memory_log/mmap.cpp +++ b/library/cpp/actors/memory_log/mmap.cpp @@ -1,63 +1,63 @@ -#include "memlog.h" - +#include "memlog.h" + #if defined(_unix_) #include <sys/mman.h> #elif defined(_win_) #include <util/system/winint.h> -#else +#else #error NO IMPLEMENTATION FOR THE PLATFORM -#endif - -void TMemoryLog::TMMapArea::MMap(size_t amount) { - Y_VERIFY(amount > 0); - +#endif + +void TMemoryLog::TMMapArea::MMap(size_t amount) { + Y_VERIFY(amount > 0); + +#if defined(_unix_) + constexpr int mmapProt = PROT_READ | PROT_WRITE; +#if defined(_linux_) + constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE; +#else + constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON; +#endif + + BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0); + if (BufPtr == MAP_FAILED) { + throw std::bad_alloc(); + } + +#elif defined(_win_) + Mapping = ::CreateFileMapping( + (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr); + if (Mapping == NULL) { + throw std::bad_alloc(); + } + BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount); + if (BufPtr == NULL) { + throw std::bad_alloc(); + } +#endif + + Size = amount; +} + +void TMemoryLog::TMMapArea::MUnmap() { + if (BufPtr == nullptr) { + return; + } + #if defined(_unix_) - constexpr int mmapProt = PROT_READ | PROT_WRITE; -#if defined(_linux_) - constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE; -#else - constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON; -#endif - - BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0); - if (BufPtr == MAP_FAILED) { - throw std::bad_alloc(); - } - -#elif defined(_win_) - Mapping = ::CreateFileMapping( - (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr); - if (Mapping == NULL) { - throw std::bad_alloc(); - } - BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount); - if (BufPtr == NULL) { - throw std::bad_alloc(); - } -#endif - - Size = amount; -} - -void TMemoryLog::TMMapArea::MUnmap() { - if (BufPtr == nullptr) { - return; - } - -#if defined(_unix_) - int result = ::munmap(BufPtr, Size); - Y_VERIFY(result == 0); - -#elif defined(_win_) - BOOL result = ::UnmapViewOfFile(BufPtr); - Y_VERIFY(result != 0); - - result = ::CloseHandle(Mapping); - Y_VERIFY(result != 0); - - Mapping = 0; -#endif - - BufPtr = nullptr; - Size = 0; -} + int result = ::munmap(BufPtr, Size); + Y_VERIFY(result == 0); + +#elif defined(_win_) + BOOL result = ::UnmapViewOfFile(BufPtr); + Y_VERIFY(result != 0); + + result = ::CloseHandle(Mapping); + Y_VERIFY(result != 0); + + Mapping = 0; +#endif + + BufPtr = nullptr; + Size = 0; +} diff --git a/library/cpp/actors/memory_log/ya.make b/library/cpp/actors/memory_log/ya.make index d89d5db4d7..441b51b3c7 100644 --- a/library/cpp/actors/memory_log/ya.make +++ b/library/cpp/actors/memory_log/ya.make @@ -1,19 +1,19 @@ -LIBRARY() - +LIBRARY() + OWNER( agri g:kikimr ) - -SRCS( - memlog.cpp - memlog.h - mmap.cpp -) - -PEERDIR( + +SRCS( + memlog.cpp + memlog.h + mmap.cpp +) + +PEERDIR( library/cpp/threading/queue - contrib/libs/linuxvdso -) - -END() + contrib/libs/linuxvdso +) + +END() diff --git a/library/cpp/actors/prof/tag.cpp b/library/cpp/actors/prof/tag.cpp index 9ccf03e1a9..46b53d804f 100644 --- a/library/cpp/actors/prof/tag.cpp +++ b/library/cpp/actors/prof/tag.cpp @@ -1,6 +1,6 @@ -#include "tag.h" +#include "tag.h" #include "tcmalloc.h" - + #include <library/cpp/charset/ci_string.h> #include <library/cpp/containers/atomizer/atomizer.h> #include <library/cpp/malloc/api/malloc.h> @@ -13,9 +13,9 @@ #include <util/generic/singleton.h> #include <util/generic/string.h> #include <util/generic/vector.h> -#include <util/system/mutex.h> - -namespace NProfiling { +#include <util/system/mutex.h> + +namespace NProfiling { class TStringAtoms { private: TMutex Mutex; @@ -59,19 +59,19 @@ namespace NProfiling { } } }; - + ui32 MakeTag(const char* s) { return TStringAtoms::Instance().MakeTag(s); } - + ui32 MakeTags(const TVector<const char*>& ss) { return TStringAtoms::Instance().MakeTags(ss); } - + const char* GetTag(ui32 tag) { return TStringAtoms::Instance().GetTag(tag); - } - + } + size_t GetTagsCount() { return TStringAtoms::Instance().GetTagsCount(); } diff --git a/library/cpp/actors/prof/tag.h b/library/cpp/actors/prof/tag.h index 357e264a22..ec4bed5b08 100644 --- a/library/cpp/actors/prof/tag.h +++ b/library/cpp/actors/prof/tag.h @@ -1,22 +1,22 @@ -#pragma once - +#pragma once + #include <util/generic/fwd.h> - -/* - Common registry for tagging memory profiler. - Register a new tag with MakeTag using a unique string. + +/* + Common registry for tagging memory profiler. + Register a new tag with MakeTag using a unique string. Use registered tags with SetThreadAllocTag function in allocator API. -*/ - -namespace NProfiling { +*/ + +namespace NProfiling { ui32 MakeTag(const char* s); - + // Make only unique tags. Y_VERIFY inside. ui32 MakeTags(const TVector<const char*>& ss); - + const char* GetTag(ui32 tag); size_t GetTagsCount(); - + using TSetThreadAllocTag = ui32(ui32 tag); extern TSetThreadAllocTag* SetThreadAllocTag; @@ -31,32 +31,32 @@ namespace NProfiling { ui32 newTag = MakeTag(tagName); RestoreTag = SetThreadAllocTag(newTag); } - + TMemoryTagScope(TMemoryTagScope&& move) : RestoreTag(move.RestoreTag) , Released(move.Released) { move.Released = true; } - + TMemoryTagScope& operator=(TMemoryTagScope&& move) { RestoreTag = move.RestoreTag; Released = move.Released; move.Released = true; return *this; } - + static void Reset(ui32 tag) { SetThreadAllocTag(tag); - } - + } + void Release() { if (!Released) { SetThreadAllocTag(RestoreTag); Released = true; } } - + ~TMemoryTagScope() { if (!Released) { SetThreadAllocTag(RestoreTag); diff --git a/library/cpp/actors/prof/ut/tag_ut.cpp b/library/cpp/actors/prof/ut/tag_ut.cpp index accf3921ab..43c56ecddc 100644 --- a/library/cpp/actors/prof/ut/tag_ut.cpp +++ b/library/cpp/actors/prof/ut/tag_ut.cpp @@ -1,68 +1,68 @@ -#include "tag.h" - +#include "tag.h" + #include <library/cpp/testing/unittest/registar.h> + +using namespace NProfiling; + +class TAtomTagsTest: public TTestBase { +private: + UNIT_TEST_SUITE(TAtomTagsTest); + UNIT_TEST(Test_MakeTag); + UNIT_TEST(Test_Make2Tags); + UNIT_TEST(Test_MakeTagTwice); + + UNIT_TEST(Test_MakeAndGetTag); + + UNIT_TEST(Test_MakeVector); + UNIT_TEST_SUITE_END(); -using namespace NProfiling; - -class TAtomTagsTest: public TTestBase { -private: - UNIT_TEST_SUITE(TAtomTagsTest); - UNIT_TEST(Test_MakeTag); - UNIT_TEST(Test_Make2Tags); - UNIT_TEST(Test_MakeTagTwice); - - UNIT_TEST(Test_MakeAndGetTag); - - UNIT_TEST(Test_MakeVector); - UNIT_TEST_SUITE_END(); - -public: - void Test_MakeTag(); - void Test_Make2Tags(); - void Test_MakeTagTwice(); - void Test_MakeAndGetTag(); - void Test_MakeVector(); -}; - -UNIT_TEST_SUITE_REGISTRATION(TAtomTagsTest); - -void TAtomTagsTest::Test_MakeTag() { - ui32 tag = MakeTag("a tag"); - UNIT_ASSERT(tag != 0); -} - -void TAtomTagsTest::Test_Make2Tags() { - ui32 tag1 = MakeTag("a tag 1"); - ui32 tag2 = MakeTag("a tag 2"); - UNIT_ASSERT(tag1 != 0); - UNIT_ASSERT(tag2 != 0); - UNIT_ASSERT(tag1 != tag2); -} - -void TAtomTagsTest::Test_MakeTagTwice() { - ui32 tag1 = MakeTag("a tag twice"); - ui32 tag2 = MakeTag("a tag twice"); - UNIT_ASSERT(tag1 != 0); - UNIT_ASSERT(tag1 == tag2); -} - -void TAtomTagsTest::Test_MakeAndGetTag() { - const char* makeStr = "tag to get"; - ui32 tag = MakeTag(makeStr); - const char* tagStr = GetTag(tag); - UNIT_ASSERT_STRINGS_EQUAL(makeStr, tagStr); -} - -void TAtomTagsTest::Test_MakeVector() { +public: + void Test_MakeTag(); + void Test_Make2Tags(); + void Test_MakeTagTwice(); + void Test_MakeAndGetTag(); + void Test_MakeVector(); +}; + +UNIT_TEST_SUITE_REGISTRATION(TAtomTagsTest); + +void TAtomTagsTest::Test_MakeTag() { + ui32 tag = MakeTag("a tag"); + UNIT_ASSERT(tag != 0); +} + +void TAtomTagsTest::Test_Make2Tags() { + ui32 tag1 = MakeTag("a tag 1"); + ui32 tag2 = MakeTag("a tag 2"); + UNIT_ASSERT(tag1 != 0); + UNIT_ASSERT(tag2 != 0); + UNIT_ASSERT(tag1 != tag2); +} + +void TAtomTagsTest::Test_MakeTagTwice() { + ui32 tag1 = MakeTag("a tag twice"); + ui32 tag2 = MakeTag("a tag twice"); + UNIT_ASSERT(tag1 != 0); + UNIT_ASSERT(tag1 == tag2); +} + +void TAtomTagsTest::Test_MakeAndGetTag() { + const char* makeStr = "tag to get"; + ui32 tag = MakeTag(makeStr); + const char* tagStr = GetTag(tag); + UNIT_ASSERT_STRINGS_EQUAL(makeStr, tagStr); +} + +void TAtomTagsTest::Test_MakeVector() { TVector<const char*> strs = { - "vector tag 0", - "vector tag 1", - "vector tag 3", + "vector tag 0", + "vector tag 1", + "vector tag 3", "vector tag 4"}; - ui32 baseTag = MakeTags(strs); - UNIT_ASSERT(baseTag != 0); - for (ui32 i = 0; i < strs.size(); ++i) { - const char* str = GetTag(baseTag + i); - UNIT_ASSERT_STRINGS_EQUAL(str, strs[i]); - } -} + ui32 baseTag = MakeTags(strs); + UNIT_ASSERT(baseTag != 0); + for (ui32 i = 0; i < strs.size(); ++i) { + const char* str = GetTag(baseTag + i); + UNIT_ASSERT_STRINGS_EQUAL(str, strs[i]); + } +} diff --git a/library/cpp/actors/prof/ut/ya.make b/library/cpp/actors/prof/ut/ya.make index 47c58a8fb7..d177fbdd22 100644 --- a/library/cpp/actors/prof/ut/ya.make +++ b/library/cpp/actors/prof/ut/ya.make @@ -1,12 +1,12 @@ UNITTEST_FOR(library/cpp/actors/prof) - + OWNER( agri g:kikimr ) - -SRCS( - tag_ut.cpp -) - -END() + +SRCS( + tag_ut.cpp +) + +END() diff --git a/library/cpp/actors/prof/ya.make b/library/cpp/actors/prof/ya.make index b5e2497563..cdd3e57d1f 100644 --- a/library/cpp/actors/prof/ya.make +++ b/library/cpp/actors/prof/ya.make @@ -1,19 +1,19 @@ -LIBRARY() - +LIBRARY() + OWNER( agri g:kikimr ) - -SRCS( - tag.cpp -) - -PEERDIR( + +SRCS( + tag.cpp +) + +PEERDIR( library/cpp/charset library/cpp/containers/atomizer -) - +) + IF (PROFILE_MEMORY_ALLOCATIONS) CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS) PEERDIR( @@ -30,4 +30,4 @@ ELSE() SRCS(tcmalloc_null.cpp) ENDIF() -END() +END() diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto index 5fbd6d44ee..5e40cbf6c2 100644 --- a/library/cpp/actors/protos/actors.proto +++ b/library/cpp/actors/protos/actors.proto @@ -6,8 +6,8 @@ message TActorId { required fixed64 RawX1 = 1; required fixed64 RawX2 = 2; } - -message TCallbackException { + +message TCallbackException { required TActorId ActorId = 1; - required string ExceptionMessage = 2; -} + required string ExceptionMessage = 2; +} diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 2e3b0d0d15..30a5c1bb74 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -14,7 +14,7 @@ message TEvNodeInfo { optional string Address = 2; optional uint32 Port = 3; } - + extend google.protobuf.FieldOptions { optional string PrintName = 50376; } @@ -43,19 +43,19 @@ message TScopeId { optional fixed64 X2 = 2; } -message THandshakeRequest { - required uint64 Protocol = 1; - - required uint64 ProgramPID = 2; - required uint64 ProgramStartTime = 3; - required uint64 Serial = 4; - - required uint32 ReceiverNodeId = 5; +message THandshakeRequest { + required uint64 Protocol = 1; + + required uint64 ProgramPID = 2; + required uint64 ProgramStartTime = 3; + required uint64 Serial = 4; + + required uint32 ReceiverNodeId = 5; required string SenderActorId = 6; - - optional string SenderHostName = 7; - optional string ReceiverHostName = 8; - optional string UUID = 9; + + optional string SenderHostName = 7; + optional string ReceiverHostName = 8; + optional string UUID = 9; optional TClusterUUIDs ClusterUUIDs = 13; optional bytes Ballast = 10; @@ -72,15 +72,15 @@ message THandshakeRequest { optional bool RequestModernFrame = 18; optional bool RequestAuthOnly = 19; -} - -message THandshakeSuccess { - required uint64 Protocol = 1; - - required uint64 ProgramPID = 2; - required uint64 ProgramStartTime = 3; - required uint64 Serial = 4; - +} + +message THandshakeSuccess { + required uint64 Protocol = 1; + + required uint64 ProgramPID = 2; + required uint64 ProgramStartTime = 3; + required uint64 Serial = 4; + required string SenderActorId = 5; optional string VersionTag = 6; @@ -94,13 +94,13 @@ message THandshakeSuccess { optional bool UseModernFrame = 11; optional bool AuthOnly = 12; -} - -message THandshakeReply { - optional THandshakeSuccess Success = 1; - optional string ErrorExplaination = 2; +} + +message THandshakeReply { + optional THandshakeSuccess Success = 1; + optional string ErrorExplaination = 2; optional bool CookieCheckResult = 3; -} +} message TEvLoadMessage { message THop { diff --git a/library/cpp/actors/protos/services_common.proto b/library/cpp/actors/protos/services_common.proto index afa0ec0073..99347ad37e 100644 --- a/library/cpp/actors/protos/services_common.proto +++ b/library/cpp/actors/protos/services_common.proto @@ -7,8 +7,8 @@ enum EServiceCommon { GLOBAL = 0; INTERCONNECT = 1; - TEST = 2; - PROTOCOLS = 3; + TEST = 2; + PROTOCOLS = 3; INTERCONNECT_SPEED_TEST = 4; INTERCONNECT_STATUS = 5; INTERCONNECT_NETWORK = 6; diff --git a/library/cpp/actors/protos/unittests.proto b/library/cpp/actors/protos/unittests.proto index a856b0942a..68b662b9b3 100644 --- a/library/cpp/actors/protos/unittests.proto +++ b/library/cpp/actors/protos/unittests.proto @@ -1,17 +1,17 @@ option cc_enable_arenas = true; -message TSimple { - required string Str1 = 1; - optional string Str2 = 2; - optional uint64 Number1 = 3; -} - -message TBigMessage { - repeated TSimple Simples = 1; - repeated string ManyStr = 2; - optional string OneMoreStr = 3; - optional uint64 YANumber = 4; -} +message TSimple { + required string Str1 = 1; + optional string Str2 = 2; + optional uint64 Number1 = 3; +} + +message TBigMessage { + repeated TSimple Simples = 1; + repeated string ManyStr = 2; + optional string OneMoreStr = 3; + optional uint64 YANumber = 4; +} message TMessageWithPayload { optional string Meta = 1; diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 6fa25b9965..0459f76386 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -74,7 +74,7 @@ namespace NActors { ActorSystem->Stop(); ActorSystem.Destroy(); - Poller.Reset(); + Poller.Reset(); } TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() { @@ -909,17 +909,17 @@ namespace NActors { case TMailboxType::Revolving: UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; - case TMailboxType::HTSwap: + case TMailboxType::HTSwap: UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; - case TMailboxType::ReadAsFilled: + break; + case TMailboxType::ReadAsFilled: UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; - case TMailboxType::TinyReadAsFilled: + break; + case TMailboxType::TinyReadAsFilled: UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; + break; default: - Y_FAIL("Unsupported mailbox type"); + Y_FAIL("Unsupported mailbox type"); } return actorId; @@ -1645,13 +1645,13 @@ namespace NActors { setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); const TActorId nameserviceId = GetNameserviceActorId(); - - TIntrusivePtr<TInterconnectProxyCommon> common; - common.Reset(new TInterconnectProxyCommon); - common->NameserviceId = nameserviceId; - common->MonCounters = interconnectCounters; + + TIntrusivePtr<TInterconnectProxyCommon> common; + common.Reset(new TInterconnectProxyCommon); + common->NameserviceId = nameserviceId; + common->MonCounters = interconnectCounters; common->TechnicalSelfHostName = "::1"; - + if (!UseRealThreads) { common->Settings.DeadPeer = TDuration::Max(); common->Settings.CloseOnIdle = TDuration::Max(); @@ -1668,7 +1668,7 @@ namespace NActors { continue; const ui32 peerNodeId = FirstNodeId + proxyNodeIndex; - + IActor *proxyActor = UseRealInterconnect ? new TInterconnectProxyTCP(peerNodeId, common) : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common); diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 26e3b45c98..cca5876645 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -556,7 +556,7 @@ namespace NActors { TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; TIntrusivePtr<NActors::NLog::TSettings> LogSettings; - TIntrusivePtr<NInterconnect::TPollerThreads> Poller; + TIntrusivePtr<NInterconnect::TPollerThreads> Poller; volatile ui64* ActorSystemTimestamp; volatile ui64* ActorSystemMonotonic; TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h index 0e21e2617c..d760252054 100644 --- a/library/cpp/actors/util/funnel_queue.h +++ b/library/cpp/actors/util/funnel_queue.h @@ -91,62 +91,62 @@ protected: delete entry; return next; } - -protected: - struct TEntryIter { - TEntry* ptr; - - ElementType& operator*() { - return ptr->Data; - } - - ElementType* operator->() { - return &ptr->Data; - } - - TEntryIter& operator++() { - ptr = AtomicGet(ptr->Next); - return *this; - } - - bool operator!=(const TEntryIter& other) const { - return ptr != other.ptr; - } - - bool operator==(const TEntryIter& other) const { - return ptr == other.ptr; - } - }; - - struct TConstEntryIter { - const TEntry* ptr; - - const ElementType& operator*() { - return ptr->Data; - } - - const ElementType* operator->() { - return &ptr->Data; - } - - TEntryIter& operator++() { - ptr = AtomicGet(ptr->Next); - return *this; - } - - bool operator!=(const TConstEntryIter& other) const { - return ptr != other.ptr; - } - - bool operator==(const TConstEntryIter& other) const { - return ptr == other.ptr; - } - }; - -public: - using const_iterator = TConstEntryIter; - using iterator = TEntryIter; - + +protected: + struct TEntryIter { + TEntry* ptr; + + ElementType& operator*() { + return ptr->Data; + } + + ElementType* operator->() { + return &ptr->Data; + } + + TEntryIter& operator++() { + ptr = AtomicGet(ptr->Next); + return *this; + } + + bool operator!=(const TEntryIter& other) const { + return ptr != other.ptr; + } + + bool operator==(const TEntryIter& other) const { + return ptr == other.ptr; + } + }; + + struct TConstEntryIter { + const TEntry* ptr; + + const ElementType& operator*() { + return ptr->Data; + } + + const ElementType* operator->() { + return &ptr->Data; + } + + TEntryIter& operator++() { + ptr = AtomicGet(ptr->Next); + return *this; + } + + bool operator!=(const TConstEntryIter& other) const { + return ptr != other.ptr; + } + + bool operator==(const TConstEntryIter& other) const { + return ptr == other.ptr; + } + }; + +public: + using const_iterator = TConstEntryIter; + using iterator = TEntryIter; + iterator begin() { return {AtomicGet(Front)}; } @@ -156,7 +156,7 @@ public: const_iterator begin() const { return {AtomicGet(Front)}; } - + iterator end() { return {nullptr}; } diff --git a/library/cpp/actors/util/recentwnd.h b/library/cpp/actors/util/recentwnd.h index ba1ede6f29..29425301e4 100644 --- a/library/cpp/actors/util/recentwnd.h +++ b/library/cpp/actors/util/recentwnd.h @@ -1,28 +1,28 @@ -#pragma once +#pragma once -#include <util/generic/deque.h> - -template <typename TElem, +#include <util/generic/deque.h> + +template <typename TElem, template <typename, typename...> class TContainer = TDeque> -class TRecentWnd { -public: +class TRecentWnd { +public: TRecentWnd(ui32 wndSize) : MaxWndSize_(wndSize) { } - - void Push(const TElem& elem) { - if (Window_.size() == MaxWndSize_) - Window_.erase(Window_.begin()); - Window_.emplace_back(elem); - } - - void Push(TElem&& elem) { - if (Window_.size() == MaxWndSize_) - Window_.erase(Window_.begin()); - Window_.emplace_back(std::move(elem)); - } - + + void Push(const TElem& elem) { + if (Window_.size() == MaxWndSize_) + Window_.erase(Window_.begin()); + Window_.emplace_back(elem); + } + + void Push(TElem&& elem) { + if (Window_.size() == MaxWndSize_) + Window_.erase(Window_.begin()); + Window_.emplace_back(std::move(elem)); + } + TElem& Last() { return Window_.back(); } @@ -35,33 +35,33 @@ public: ui64 Size() const { return Window_.size(); } - - using const_iterator = typename TContainer<TElem>::const_iterator; - + + using const_iterator = typename TContainer<TElem>::const_iterator; + const_iterator begin() { return Window_.begin(); } const_iterator end() { return Window_.end(); } + + void Reset(ui32 wndSize = 0) { + Window_.clear(); + if (wndSize != 0) { + MaxWndSize_ = wndSize; + } + } + + void ResetWnd(ui32 wndSize) { + Y_VERIFY(wndSize != 0); + MaxWndSize_ = wndSize; + if (Window_.size() > MaxWndSize_) { + Window_.erase(Window_.begin(), + Window_.begin() + Window_.size() - MaxWndSize_); + } + } - void Reset(ui32 wndSize = 0) { - Window_.clear(); - if (wndSize != 0) { - MaxWndSize_ = wndSize; - } - } - - void ResetWnd(ui32 wndSize) { - Y_VERIFY(wndSize != 0); - MaxWndSize_ = wndSize; - if (Window_.size() > MaxWndSize_) { - Window_.erase(Window_.begin(), - Window_.begin() + Window_.size() - MaxWndSize_); - } - } - -private: - TContainer<TElem> Window_; +private: + TContainer<TElem> Window_; ui32 MaxWndSize_; -}; +}; diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h index d742c8c585..d90ab745fe 100644 --- a/library/cpp/actors/util/thread.h +++ b/library/cpp/actors/util/thread.h @@ -10,17 +10,17 @@ inline void SetCurrentThreadName(const TString& name, const ui32 maxCharsFromProcessName = 8) { #if defined(_linux_) - // linux limits threadname by 15 + \0 - - TStringBuf procName(GetExecPath()); - procName = procName.RNextTok('/'); - procName = procName.SubStr(0, maxCharsFromProcessName); - + // linux limits threadname by 15 + \0 + + TStringBuf procName(GetExecPath()); + procName = procName.RNextTok('/'); + procName = procName.SubStr(0, maxCharsFromProcessName); + TStringStream linuxName; - linuxName << procName << "." << name; + linuxName << procName << "." << name; TThread::SetCurrentThreadName(linuxName.Str().data()); #else - Y_UNUSED(maxCharsFromProcessName); + Y_UNUSED(maxCharsFromProcessName); TThread::SetCurrentThreadName(name.data()); #endif } |