diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:30 +0300 |
commit | f7835298a8840c8e5d98715bf23efa9c7e03b9c4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library | |
parent | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (diff) | |
download | ydb-f7835298a8840c8e5d98715bf23efa9c7e03b9c4.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
36 files changed, 649 insertions, 649 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 243a3e1a86..6f9ba6a42b 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -16,15 +16,15 @@ namespace NActors { } void IActor::Registered(TActorSystem* sys, const TActorId& owner) { - // fallback to legacy method, do not use it anymore - if (auto eh = AfterRegister(SelfId(), owner)) - sys->Send(eh); - } - - void IActor::Describe(IOutputStream &out) const noexcept { - SelfActorId.Out(out); - } - + // fallback to legacy method, do not use it anymore + if (auto eh = AfterRegister(SelfId(), owner)) + sys->Send(eh); + } + + void IActor::Describe(IOutputStream &out) const noexcept { + SelfActorId.Out(out); + } + bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); } @@ -123,7 +123,7 @@ namespace NActors { TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } - void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { + void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept { TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 7a414bad1c..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -6,7 +6,7 @@ #include <library/cpp/actors/util/local_process_key.h> namespace NActors { - class TActorSystem; + class TActorSystem; class TMailboxTable; struct TMailboxHeader; @@ -169,11 +169,11 @@ namespace NActors { void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; }; - class IActor; - - class IActorOps : TNonCopyable { + class IActor; + + class IActorOps : TNonCopyable { public: - virtual void Describe(IOutputStream&) const noexcept = 0; + virtual void Describe(IOutputStream&) const noexcept = 0; virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0; /** @@ -205,12 +205,12 @@ namespace NActors { virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0; virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0; - }; - + }; + class TDecorator; - class IActor : protected IActorOps { - public: + class IActor : protected IActorOps { + public: typedef void (IActor::*TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); private: @@ -324,7 +324,7 @@ namespace NActors { } virtual void Registered(TActorSystem* sys, const TActorId& owner); - + virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) { Y_UNUSED(self); Y_UNUSED(parentId); @@ -349,7 +349,7 @@ namespace NActors { } protected: - void Describe(IOutputStream&) const noexcept override; + void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; template <typename TEvent> bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ @@ -363,7 +363,7 @@ namespace NActors { void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; - void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; + void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; // register new actor in ActorSystem on new fresh mailbox. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 745bbd28f4..6ff02aaf94 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -59,11 +59,11 @@ namespace NActors { public: template <typename TEv> inline TEv* CastAsLocal() const noexcept { - auto fits = GetTypeRewrite() == TEv::EventType; - - return fits ? static_cast<TEv*>(Event.Get()) : nullptr; - } - + auto fits = GetTypeRewrite() == TEv::EventType; + + return fits ? static_cast<TEv*>(Event.Get()) : nullptr; + } + template <typename TEventType> TEventType* Get() { if (Type != TEventType::EventType) diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h index 984cb51523..702cf50fad 100644 --- a/library/cpp/actors/core/events.h +++ b/library/cpp/actors/core/events.h @@ -85,23 +85,23 @@ namespace NActors { Unsubscribe, // generic unsubscribe from something Delivered, // event delivered Undelivered, // event undelivered - Poison, // request actor to shutdown + Poison, // request actor to shutdown Completed, // generic async job result event - PoisonTaken, // generic Poison taken (reply to PoisonPill event, i.e. died completely) + PoisonTaken, // generic Poison taken (reply to PoisonPill event, i.e. died completely) FlushLog, CallbackCompletion, CallbackException, - Gone, // Generic notification of actor death + Gone, // Generic notification of actor death TrackActor, UntrackActor, InvokeResult, CoroTimeout, InvokeQuery, - End, - + End, + // Compatibility section - PoisonPill = Poison, - ActorDied = Gone, + PoisonPill = Poison, + ActorDied = Gone, }; static_assert(End < EventSpaceEnd(ES_SYSTEM), "expect End < EventSpaceEnd(ES_SYSTEM)"); @@ -111,16 +111,16 @@ namespace NActors { DEFINE_SIMPLE_LOCAL_EVENT(TEvBootstrap, "System: TEvBootstrap") }; - struct TEvPoison : public TEventBase<TEvPoison, TSystem::Poison> { - DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPoison, "System: TEvPoison") + struct TEvPoison : public TEventBase<TEvPoison, TSystem::Poison> { + DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPoison, "System: TEvPoison") }; struct TEvWakeup: public TEventBase<TEvWakeup, TSystem::Wakeup> { DEFINE_SIMPLE_LOCAL_EVENT(TEvWakeup, "System: TEvWakeup") - - TEvWakeup(ui64 tag = 0) : Tag(tag) { } - - const ui64 Tag = 0; + + TEvWakeup(ui64 tag = 0) : Tag(tag) { } + + const ui64 Tag = 0; }; struct TEvSubscribe: public TEventBase<TEvSubscribe, TSystem::Subscribe> { @@ -205,14 +205,14 @@ namespace NActors { } }; - struct TEvGone: public TEventBase<TEvGone, TSystem::Gone> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvGone, "System: TEvGone") + struct TEvGone: public TEventBase<TEvGone, TSystem::Gone> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvGone, "System: TEvGone") }; - + struct TEvInvokeResult; - using TEvPoisonPill = TEvPoison; // Legacy name, deprecated - using TEvActorDied = TEvGone; + using TEvPoisonPill = TEvPoison; // Legacy name, deprecated + using TEvActorDied = TEvGone; }; } diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 1fb66adc61..c3b9999168 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -8,8 +8,8 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) { - actor->SelfActorId = self; - actor->Registered(sys, owner); + actor->SelfActorId = self; + actor->Registered(sys, owner); } TExecutorPoolBaseMailboxed::TExecutorPoolBaseMailboxed(ui32 poolId, ui32 maxActivityType) @@ -98,7 +98,7 @@ namespace NActors { // do init const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); - DoActorInit(ActorSystem, actor, actorId, parentId); + DoActorInit(ActorSystem, actor, actorId, parentId); // Once we unlock the mailbox the actor starts running and we cannot use the pointer any more actor = nullptr; @@ -145,7 +145,7 @@ namespace NActors { mailbox->AttachActor(localActorId, actor); const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint); - DoActorInit(ActorSystem, actor, actorId, parentId); + DoActorInit(ActorSystem, actor, actorId, parentId); NHPTimer::STime elapsed = GetCycleCountFast() - hpstart; if (elapsed > 1000000) { LWPROBE(SlowRegisterAdd, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0); diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 7a34944ae2..446b651efd 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -67,10 +67,10 @@ namespace NActors { DyingActors.push_back(THolder(actor)); } - void TExecutorThread::DropUnregistered() { - DyingActors.clear(); // here is actual destruction of actors - } - + void TExecutorThread::DropUnregistered() { + DyingActors.clear(); // here is actual destruction of actors + } + void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); @@ -181,7 +181,7 @@ namespace NActors { size_t dyingActorsCnt = DyingActors.size(); Ctx.UpdateActorsStats(dyingActorsCnt); if (dyingActorsCnt) { - DropUnregistered(); + DropUnregistered(); actor = nullptr; } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index a1ef9786f6..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -43,7 +43,7 @@ namespace NActors { const TActorId& parentId = TActorId()); TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId()); void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId); - void DropUnregistered(); + void DropUnregistered(); const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index ca0c6e60ea..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -251,11 +251,11 @@ namespace NActors { const auto prio = ev.Level.ToPrio(); - switch (prio) { - case ::NActors::NLog::EPrio::Alert: + switch (prio) { + case ::NActors::NLog::EPrio::Alert: Metrics->IncAlertMsgs(); break; - case ::NActors::NLog::EPrio::Emerg: + case ::NActors::NLog::EPrio::Emerg: Metrics->IncEmergMsgs(); break; default: @@ -344,10 +344,10 @@ namespace NActors { str << "<a href='logger?c=" << i << "'>" << name << "</a>"; } TABLED() { - str << PriorityToString(EPrio(componentSettings.Raw.X.Level)); + str << PriorityToString(EPrio(componentSettings.Raw.X.Level)); } TABLED() { - str << PriorityToString(EPrio(componentSettings.Raw.X.SamplingLevel)); + str << PriorityToString(EPrio(componentSettings.Raw.X.SamplingLevel)); } TABLED() { str << componentSettings.Raw.X.SamplingRate; @@ -421,11 +421,11 @@ namespace NActors { UL() { LI() { str << "Priority: " - << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.Level)); + << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.Level)); } LI() { str << "Sampling priority: " - << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.SamplingLevel)); + << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.SamplingLevel)); } LI() { str << "Sampling rate: " @@ -444,7 +444,7 @@ namespace NActors { for (int p = NLog::PRI_EMERG; p <= NLog::PRI_TRACE; ++p) { LI() { str << "<a href='logger?c=" << component << "&p=" << p << "'>" - << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; + << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; } } } @@ -455,7 +455,7 @@ namespace NActors { for (int p = NLog::PRI_EMERG; p <= NLog::PRI_TRACE; ++p) { LI() { str << "<a href='logger?c=" << component << "&sp=" << p << "'>" - << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; + << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; } } } @@ -519,7 +519,7 @@ namespace NActors { TABLER() { TABLED() { str << "<a href = 'logger?c=-1&p=" << p << "'>" - << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; + << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; } } } @@ -541,7 +541,7 @@ namespace NActors { TABLER() { TABLED() { str << "<a href = 'logger?c=-1&sp=" << p << "'>" - << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; + << NLog::PriorityToString(NLog::EPrio(p)) << "</a>"; } } } @@ -576,8 +576,8 @@ namespace NActors { bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept try { - const auto logPrio = ::ELogPriority(ui16(priority)); - + const auto logPrio = ::ELogPriority(ui16(priority)); + char buf[TimeBufSize]; switch (Settings->Format) { case NActors::NLog::TSettings::PLAIN_FULL_FORMAT: { diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 65e75b9909..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -2,7 +2,7 @@ #include "defs.h" -#include "log_iface.h" +#include "log_iface.h" #include "log_settings.h" #include "actorsystem.h" #include "events.h" @@ -122,7 +122,7 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////// // SET LOG LEVEL FOR A COMPONENT //////////////////////////////////////////////////////////////////////////////// - class TLogComponentLevelRequest: public TEventLocal<TLogComponentLevelRequest, int(NLog::EEv::LevelReq)> { + class TLogComponentLevelRequest: public TEventLocal<TLogComponentLevelRequest, int(NLog::EEv::LevelReq)> { public: // set given priority for the component TLogComponentLevelRequest(NLog::EPriority priority, NLog::EComponent component) @@ -145,7 +145,7 @@ namespace NActors { friend class TLoggerActor; }; - class TLogComponentLevelResponse: public TEventLocal<TLogComponentLevelResponse, int(NLog::EEv::LevelResp)> { + class TLogComponentLevelResponse: public TEventLocal<TLogComponentLevelResponse, int(NLog::EEv::LevelResp)> { public: TLogComponentLevelResponse(int code, const TString& explanation) : Code(code) @@ -166,7 +166,7 @@ namespace NActors { TString Explanation; }; - class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> { + class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> { public: TLogIgnored() { } @@ -213,7 +213,7 @@ namespace NActors { void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { HFunc(TLogIgnored, HandleIgnoredEvent); - HFunc(NLog::TEvLog, HandleLogEvent); + HFunc(NLog::TEvLog, HandleLogEvent); HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); HFunc(NMon::TEvHttpInfo, HandleMonInfo); } @@ -246,7 +246,7 @@ namespace NActors { void BecomeDefunct(); void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx); void HandleIgnoredEventDrop(); - void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx); + void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx); void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev); void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx); void HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx); diff --git a/library/cpp/actors/core/log_iface.h b/library/cpp/actors/core/log_iface.h index 0ca9c14b91..b331db9ca8 100644 --- a/library/cpp/actors/core/log_iface.h +++ b/library/cpp/actors/core/log_iface.h @@ -1,12 +1,12 @@ -#pragma once - -#include "events.h" -#include "event_local.h" - -namespace NActors { +#pragma once + +#include "events.h" +#include "event_local.h" + +namespace NActors { namespace NLog { using EComponent = int; - + enum EPriority : ui16 { // migrate it to EPrio whenever possible PRI_EMERG, PRI_ALERT, @@ -18,7 +18,7 @@ namespace NActors { PRI_DEBUG, PRI_TRACE }; - + enum class EPrio : ui16 { Emerg = 0, Alert = 1, @@ -30,39 +30,39 @@ namespace NActors { Debug = 7, Trace = 8, }; - + struct TLevel { TLevel(ui32 raw) : Raw(raw) { } - + TLevel(EPrio prio) : Raw((ui16(prio) + 1) << 8) { } - + EPrio ToPrio() const noexcept { const auto major = Raw >> 8; - + return major > 0 ? EPrio(major - 1) : EPrio::Emerg; } - + bool IsUrgentAbortion() const noexcept { return (Raw >> 8) == 0; } - + /* Generalized monotonic level value composed with major and minor - levels. Minor is used for verbosity within major, basic EPrio - mapped to (EPrio + 1, 0) and Major = 0 is reserved as special - space with meaning like EPrio::Emerg but with extened actions. - Thus logger should map Major = 0 to EPrio::Emerg if it have no - idea how to handle special emergency actions. - */ - + levels. Minor is used for verbosity within major, basic EPrio + mapped to (EPrio + 1, 0) and Major = 0 is reserved as special + space with meaning like EPrio::Emerg but with extened actions. + Thus logger should map Major = 0 to EPrio::Emerg if it have no + idea how to handle special emergency actions. + */ + ui32 Raw = 0; // ((ui16(EPrio) + 1) << 8) | ui8(minor) }; - + enum class EEv { Log = EventSpaceBegin(TEvents::ES_LOGGER), LevelReq, @@ -70,9 +70,9 @@ namespace NActors { Ignored, End }; - + static_assert(int(EEv::End) < EventSpaceEnd(TEvents::ES_LOGGER), ""); - + class TEvLog: public TEventLocal<TEvLog, int(EEv::Log)> { public: TEvLog(TInstant stamp, TLevel level, EComponent comp, const TString &line) @@ -90,7 +90,7 @@ namespace NActors { , Line(std::move(line)) { } - + TEvLog(EPriority prio, EComponent comp, TString line, TInstant time = TInstant::Now()) : Stamp(time) , Level(EPrio(prio)) @@ -98,12 +98,12 @@ namespace NActors { , Line(std::move(line)) { } - + const TInstant Stamp = TInstant::Max(); const TLevel Level; const EComponent Component = 0; TString Line; }; - + } -} +} diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index 5a5b66b4ac..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -116,7 +116,7 @@ namespace NActors { str << titleName << " for all components has been changed to " - << PriorityToString(EPrio(priority)); + << PriorityToString(EPrio(priority)); explanation = str.Str(); return 0; } else { @@ -136,8 +136,8 @@ namespace NActors { AtomicSet(ComponentInfo[component], settings.Raw.Data); TStringStream str; str << titleName << " for the component " << ComponentNames[component] - << " has been changed from " << PriorityToString(EPrio(oldPriority)) - << " to " << PriorityToString(EPrio(priority)); + << " has been changed from " << PriorityToString(EPrio(oldPriority)) + << " to " << PriorityToString(EPrio(priority)); explanation = str.Str(); return 0; } diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 7f9ea16553..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -1,32 +1,32 @@ #pragma once #include "actor.h" -#include "log_iface.h" +#include "log_iface.h" #include <util/generic/vector.h> #include <util/digest/murmur.h> #include <util/random/easy.h> namespace NActors { namespace NLog { - inline const char* PriorityToString(EPrio priority) { + inline const char* PriorityToString(EPrio priority) { switch (priority) { - case EPrio::Emerg: + case EPrio::Emerg: return "EMERG"; - case EPrio::Alert: + case EPrio::Alert: return "ALERT"; - case EPrio::Crit: + case EPrio::Crit: return "CRIT"; - case EPrio::Error: + case EPrio::Error: return "ERROR"; - case EPrio::Warn: + case EPrio::Warn: return "WARN"; - case EPrio::Notice: + case EPrio::Notice: return "NOTICE"; - case EPrio::Info: + case EPrio::Info: return "INFO"; - case EPrio::Debug: + case EPrio::Debug: return "DEBUG"; - case EPrio::Trace: + case EPrio::Trace: return "TRACE"; default: return "UNKNOWN"; diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp index c966fd7b13..649339ddb2 100644 --- a/library/cpp/actors/dnscachelib/dnscache.cpp +++ b/library/cpp/actors/dnscachelib/dnscache.cpp @@ -7,7 +7,7 @@ #include <util/datetime/systime.h> const TDnsCache::THost TDnsCache::NullHost; - + LWTRACE_USING(DNSCACHELIB_PROVIDER); static_assert(sizeof(ares_channel) == sizeof(void*), "expect sizeof(ares_channel) == sizeof(void *)"); @@ -22,7 +22,7 @@ TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg , ACacheMisses(0) , PtrCacheHits(0) , PtrCacheMisses(0) -{ +{ #ifdef _win_ if (ares_library_init(ARES_LIB_INIT_WIN32) != ARES_SUCCESS) { LWPROBE(AresInitFailed); @@ -53,97 +53,97 @@ TDnsCache::~TDnsCache(void) { } TString TDnsCache::GetHostByAddr(const NAddr::IRemoteAddr& addr) { - in6_addr key; - - if (addr.Addr()->sa_family == AF_INET6) { + in6_addr key; + + if (addr.Addr()->sa_family == AF_INET6) { const struct sockaddr_in6* s6 = (const struct sockaddr_in6*)(addr.Addr()); - memcpy(&key, &s6->sin6_addr, sizeof(s6->sin6_addr)); - } else if (addr.Addr()->sa_family == AF_INET) { + memcpy(&key, &s6->sin6_addr, sizeof(s6->sin6_addr)); + } else if (addr.Addr()->sa_family == AF_INET) { const struct sockaddr_in* s4 = (const struct sockaddr_in*)(addr.Addr()); - memset(&key, 0, sizeof(key)); - memcpy(&key, &s4->sin_addr, sizeof(s4->sin_addr)); - } else { - return ""; - } + memset(&key, 0, sizeof(key)); + memcpy(&key, &s4->sin_addr, sizeof(s4->sin_addr)); + } else { + return ""; + } const TAddr& host = ResolveAddr(key, addr.Addr()->sa_family); - - return host.Hostname; -} - + + return host.Hostname; +} + TIpHost TDnsCache::Get(const TString& hostname) { - if (!AllowIpV4) - return TIpHost(-1); - + if (!AllowIpV4) + return TIpHost(-1); + const THost& addr = Resolve(hostname, AF_INET); - + TGuard<TMutex> lock(CacheMtx); - if (addr.AddrsV4.empty()) { - return TIpHost(-1); - } - return addr.AddrsV4.front(); -} - -NAddr::IRemoteAddrPtr TDnsCache::GetAddr( + if (addr.AddrsV4.empty()) { + return TIpHost(-1); + } + return addr.AddrsV4.front(); +} + +NAddr::IRemoteAddrPtr TDnsCache::GetAddr( const TString& hostname, int family, TIpPort port, bool cacheOnly) { - if (family != AF_INET && AllowIpV6) { + if (family != AF_INET && AllowIpV6) { const THost& addr = Resolve(hostname, AF_INET6, cacheOnly); - + TGuard<TMutex> lock(CacheMtx); - if (!addr.AddrsV6.empty()) { - struct sockaddr_in6 sin6; - Zero(sin6); - sin6.sin6_family = AF_INET6; - sin6.sin6_addr = addr.AddrsV6.front(); - sin6.sin6_port = HostToInet(port); - + if (!addr.AddrsV6.empty()) { + struct sockaddr_in6 sin6; + Zero(sin6); + sin6.sin6_family = AF_INET6; + sin6.sin6_addr = addr.AddrsV6.front(); + sin6.sin6_port = HostToInet(port); + return MakeHolder<NAddr::TIPv6Addr>(sin6); - } - } - - if (family != AF_INET6 && AllowIpV4) { + } + } + + if (family != AF_INET6 && AllowIpV4) { const THost& addr = Resolve(hostname, AF_INET, cacheOnly); - + TGuard<TMutex> lock(CacheMtx); - if (!addr.AddrsV4.empty()) { + if (!addr.AddrsV4.empty()) { return MakeHolder<NAddr::TIPv4Addr>(TIpAddress(addr.AddrsV4.front(), port)); - } - } - + } + } + LWPROBE(FamilyMismatch, family, AllowIpV4, AllowIpV6); return nullptr; -} - -void TDnsCache::GetAllAddresses( +} + +void TDnsCache::GetAllAddresses( const TString& hostname, TVector<NAddr::IRemoteAddrPtr>& addrs) { - if (AllowIpV4) { + if (AllowIpV4) { const THost& addr4 = Resolve(hostname, AF_INET); TGuard<TMutex> lock(CacheMtx); - for (size_t i = 0; i < addr4.AddrsV4.size(); i++) { + for (size_t i = 0; i < addr4.AddrsV4.size(); i++) { addrs.push_back(MakeHolder<NAddr::TIPv4Addr>(TIpAddress(addr4.AddrsV4[i], 0))); - } - } - - if (AllowIpV6) { + } + } + + if (AllowIpV6) { const THost& addr6 = Resolve(hostname, AF_INET6); - struct sockaddr_in6 sin6; - Zero(sin6); - sin6.sin6_family = AF_INET6; + struct sockaddr_in6 sin6; + Zero(sin6); + sin6.sin6_family = AF_INET6; TGuard<TMutex> lock(CacheMtx); - for (size_t i = 0; i < addr6.AddrsV6.size(); i++) { - sin6.sin6_addr = addr6.AddrsV6[i]; - + for (size_t i = 0; i < addr6.AddrsV6.size(); i++) { + sin6.sin6_addr = addr6.AddrsV6[i]; + addrs.push_back(MakeHolder<NAddr::TIPv6Addr>(sin6)); - } - } -} - + } + } +} + void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses, ui64& ptr_cache_hits, ui64& ptr_cache_misses) { TGuard<TMutex> lock(CacheMtx); @@ -169,11 +169,11 @@ bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept const TDnsCache::THost& TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { - if (!ValidateHName(hostname)) { + if (!ValidateHName(hostname)) { LWPROBE(ResolveNullHost, hostname, family); - return NullHost; - } - + return NullHost; + } + THostCache::iterator p; Y_ASSERT(family == AF_INET || family == AF_INET6); @@ -232,9 +232,9 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { } bool TDnsCache::ValidateHName(const TString& name) const noexcept { - return name.size() > 0; -} - + return name.size() > 0; +} + const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) { TAddrCache::iterator p; @@ -282,7 +282,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) } void TDnsCache::WaitTask(TAtomic& flag) { - const TInstant start = TInstant(TTimeKeeper::GetTimeval()); + const TInstant start = TInstant(TTimeKeeper::GetTimeval()); while (AtomicGet(flag)) { ares_channel chan = static_cast<ares_channel>(Channel); @@ -319,11 +319,11 @@ void TDnsCache::WaitTask(TAtomic& flag) { Y_ASSERT(nfds != 0); - const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start; - const TDuration wait = Max(Timeout - left, TDuration::Zero()); - - int rv = poll(pfd, nfds, wait.MilliSeconds()); - + const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start; + const TDuration wait = Max(Timeout - left, TDuration::Zero()); + + int rv = poll(pfd, nfds, wait.MilliSeconds()); + if (rv == -1) { if (errno == EINTR) { continue; @@ -351,10 +351,10 @@ void TDnsCache::WaitTask(TAtomic& flag) { : ARES_SOCKET_BAD); } } - - if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) { - break; - } + + if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) { + break; + } } } diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h index 2d6adb3a36..3313a251a1 100644 --- a/library/cpp/actors/dnscachelib/dnscache.h +++ b/library/cpp/actors/dnscachelib/dnscache.h @@ -5,7 +5,7 @@ #include <util/generic/vector.h> #include <util/network/address.h> #include <util/system/mutex.h> -#include <util/datetime/base.h> +#include <util/datetime/base.h> /** Asynchronous DNS resolver. * @@ -19,7 +19,7 @@ class TDnsCache { public: TDnsCache(bool allowIpv4 = true, bool allowIpv6 = true, time_t entry_lifetime = 1800, time_t neg_lifetime = 1, ui32 request_timeout = 500000); - ~TDnsCache(); + ~TDnsCache(); TString GetHostByAddr(const NAddr::IRemoteAddr&); @@ -37,9 +37,9 @@ public: void GetStats(ui64& a_cache_hits, ui64& a_cache_misses, ui64& ptr_cache_hits, ui64& ptr_cache_misses); -protected: +protected: bool ValidateHName(const TString& host) const noexcept; - + private: struct TGHBNContext { TDnsCache* Owner; @@ -52,7 +52,7 @@ private: in6_addr Addr; }; - struct THost { + struct THost { THost() noexcept { } @@ -73,8 +73,8 @@ private: }; typedef TMap<TString, THost> THostCache; - - struct TAddr { + + struct TAddr { TString Hostname; time_t Resolved = 0; time_t NotFound = 0; @@ -112,7 +112,7 @@ private: const time_t EntryLifetime; const time_t NegativeLifetime; - const TDuration Timeout; + const TDuration Timeout; const bool AllowIpV4; const bool AllowIpV6; @@ -124,8 +124,8 @@ private: ui64 PtrCacheHits; ui64 PtrCacheMisses; - const static THost NullHost; - + const static THost NullHost; + TMutex AresMtx; void* Channel; diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 7c4871399d..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1,4 +1,4 @@ -#include "test_runtime.h" +#include "test_runtime.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/callstack.h> @@ -22,16 +22,16 @@ bool VERBOSE = false; const bool PRINT_EVENT_BODY = false; -namespace { +namespace { + + TString MakeClusterId() { + pid_t pid = getpid(); + TStringBuilder uuid; + uuid << "Cluster for process with id: " << pid; + return uuid; + } +} - TString MakeClusterId() { - pid_t pid = getpid(); - TStringBuilder uuid; - uuid << "Cluster for process with id: " << pid; - return uuid; - } -} - namespace NActors { ui64 TScheduledEventQueueItem::NextUniqueId = 0; @@ -80,7 +80,7 @@ namespace NActors { TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() { Stop(); } - + class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { public: @@ -326,7 +326,7 @@ namespace NActors { } TDuration delay = (deadline - now); - if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { + if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); Runtime->MailboxesHasEvents.Signal(); @@ -365,7 +365,7 @@ namespace NActors { } ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { + if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); if (ev->GetRecipientRewrite() == logger) { @@ -451,15 +451,15 @@ namespace NActors { TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) : TTestActorRuntimeBase(1, 1, false) - { - SingleSysEnv = true; - } - + { + SingleSysEnv = true; + } + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) - , ClusterUUID(MakeClusterId()) + , ClusterUUID(MakeClusterId()) , FirstNodeId(NextNodeId) , NodeCount(nodeCount) , DataCenterCount(dataCenterCount) @@ -741,7 +741,7 @@ namespace NActors { void TTestActorRuntimeBase::InitNodes() { NextNodeId += NodeCount; Y_VERIFY(NodeCount > 0); - + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; TNodeDataBase* node = nodeIt->second.Get(); @@ -900,7 +900,7 @@ namespace NActors { const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); - DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); + DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); switch (mailboxType) { case TMailboxType::Simple: @@ -944,7 +944,7 @@ namespace NActors { const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); - DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); + DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); return actorId; } @@ -1583,7 +1583,7 @@ namespace NActors { TCallstack::GetTlsCallstack().SetLinesToSkip(); #endif recipientActor->Receive(evHolder, ctx); - node->ExecutorThread->DropUnregistered(); + node->ExecutorThread->DropUnregistered(); } CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; @@ -1683,24 +1683,24 @@ namespace NActors { NActors::TMailboxType::Simple, InterconnectPoolId())); } - if (!SingleSysEnv) { // Single system env should do this self - TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); - NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, + if (!SingleSysEnv) { // Single system env should do this self + TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); + NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); - setup->LocalServices.push_back(loggerActorPair); - } + setup->LocalServices.push_back(loggerActorPair); + } return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); } TActorSystem* TTestActorRuntimeBase::SingleSys() const { - Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); - - return Nodes.begin()->second->ActorSystem.Get(); - } - + Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); + + return Nodes.begin()->second->ActorSystem.Get(); + } + TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { for (auto& x : Nodes) { return x.second->ActorSystem.Get(); diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 38e964835c..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -39,8 +39,8 @@ const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( namespace NActors { - struct THeSingleSystemEnv { }; - + struct THeSingleSystemEnv { }; + struct TEventMailboxId { TEventMailboxId() : NodeId(0) @@ -200,7 +200,7 @@ namespace NActors { typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; - + TTestActorRuntimeBase(THeSingleSystemEnv); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); @@ -284,7 +284,7 @@ namespace NActors { return Nodes[FirstNodeId + nodeIdx]->LogSettings; } - TActorSystem* SingleSys() const; + TActorSystem* SingleSys() const; TActorSystem* GetAnyNodeActorSystem(); TActorSystem* GetActorSystem(ui32 nodeId); template <typename TEvent> @@ -471,12 +471,12 @@ namespace NActors { UseRealInterconnect = true; } - protected: + protected: struct TNodeDataBase; TNodeDataBase* GetRawNode(ui32 node) const { - return Nodes.at(FirstNodeId + node).Get(); - } - + return Nodes.at(FirstNodeId + node).Get(); + } + static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { Y_UNUSED(counters); @@ -492,7 +492,7 @@ namespace NActors { Y_UNUSED(setup); } - private: + private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); @@ -507,12 +507,12 @@ namespace NActors { THolder<TTempDir> TmpDir; const TThread::TId MainThreadId; - protected: + protected: bool UseRealInterconnect = false; TInterconnectMock InterconnectMock; bool IsInitialized = false; bool SingleSysEnv = false; - const TString ClusterUUID; + const TString ClusterUUID; const ui32 FirstNodeId; const ui32 NodeCount; const ui32 DataCenterCount; @@ -535,7 +535,7 @@ namespace NActors { TIntrusivePtr<IRandomProvider> RandomProvider; TIntrusivePtr<ITimeProvider> TimeProvider; - protected: + protected: struct TNodeDataBase: public TThrRefBase { TNodeDataBase(); void Stop(); @@ -598,7 +598,7 @@ namespace NActors { protected: THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; - private: + private: void InitNode(TNodeDataBase* node, size_t idx); struct TDispatchContext { diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make index 87dc50175e..1afb3f6059 100644 --- a/library/cpp/actors/testlib/ya.make +++ b/library/cpp/actors/testlib/ya.make @@ -1,26 +1,26 @@ -LIBRARY() - -OWNER( - g:kikimr -) - -SRCS( - test_runtime.cpp -) - -PEERDIR( +LIBRARY() + +OWNER( + g:kikimr +) + +SRCS( + test_runtime.cpp +) + +PEERDIR( library/cpp/actors/core library/cpp/actors/interconnect/mock library/cpp/actors/protos library/cpp/random_provider library/cpp/time_provider -) - -IF (GCC) - CFLAGS(-fno-devirtualize-speculatively) -ENDIF() - -END() +) + +IF (GCC) + CFLAGS(-fno-devirtualize-speculatively) +ENDIF() + +END() RECURSE_FOR_TESTS( ut diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h index 588b051815..fd2a906d6f 100644 --- a/library/cpp/lfalloc/lf_allocX64.h +++ b/library/cpp/lfalloc/lf_allocX64.h @@ -390,53 +390,53 @@ static char* AllocWithMMap(uintptr_t sz, EMMapMode mode) { return largeBlock; } -enum class ELarge : ui8 { - Free = 0, // block in free cache - Alloc = 1, // block is allocated - Gone = 2, // block was unmapped -}; - -struct TLargeBlk { - - static TLargeBlk* As(void *raw) { - return reinterpret_cast<TLargeBlk*>((char*)raw - 4096ll); - } - - static const TLargeBlk* As(const void *raw) { - return reinterpret_cast<const TLargeBlk*>((const char*)raw - 4096ll); - } - - void SetSize(size_t bytes, size_t pages) { - Pages = pages; - Bytes = bytes; - } - - void Mark(ELarge state) { - const ui64 marks[] = { - 0x8b38aa5ca4953c98, // ELarge::Free - 0xf916d33584eb5087, // ELarge::Alloc - 0xd33b0eca7651bc3f // ELarge::Gone - }; - - Token = size_t(marks[ui8(state)]); - } - - size_t Pages; // Total pages allocated with mmap like call - size_t Bytes; // Actually requested bytes by user - size_t Token; // Block state token, see ELarge enum. -}; - - -static void LargeBlockUnmap(void* p, size_t pages) { - const auto bytes = (pages + 1) * uintptr_t(4096); - - IncrementCounter(CT_MUNMAP, bytes); +enum class ELarge : ui8 { + Free = 0, // block in free cache + Alloc = 1, // block is allocated + Gone = 2, // block was unmapped +}; + +struct TLargeBlk { + + static TLargeBlk* As(void *raw) { + return reinterpret_cast<TLargeBlk*>((char*)raw - 4096ll); + } + + static const TLargeBlk* As(const void *raw) { + return reinterpret_cast<const TLargeBlk*>((const char*)raw - 4096ll); + } + + void SetSize(size_t bytes, size_t pages) { + Pages = pages; + Bytes = bytes; + } + + void Mark(ELarge state) { + const ui64 marks[] = { + 0x8b38aa5ca4953c98, // ELarge::Free + 0xf916d33584eb5087, // ELarge::Alloc + 0xd33b0eca7651bc3f // ELarge::Gone + }; + + Token = size_t(marks[ui8(state)]); + } + + size_t Pages; // Total pages allocated with mmap like call + size_t Bytes; // Actually requested bytes by user + size_t Token; // Block state token, see ELarge enum. +}; + + +static void LargeBlockUnmap(void* p, size_t pages) { + const auto bytes = (pages + 1) * uintptr_t(4096); + + IncrementCounter(CT_MUNMAP, bytes); IncrementCounter(CT_MUNMAP_CNT, 1); #ifdef _MSC_VER Y_ASSERT_NOBT(0); #else - TLargeBlk::As(p)->Mark(ELarge::Gone); - munmap((char*)p - 4096ll, bytes); + TLargeBlk::As(p)->Mark(ELarge::Gone); + munmap((char*)p - 4096ll, bytes); #endif } @@ -447,7 +447,7 @@ static int LB_LIMIT_TOTAL_SIZE = 500 * 1024 * 1024 / 4096; // do not keep more t static void* volatile lbFreePtrs[LB_BUF_HASH][LB_BUF_SIZE]; static TAtomic lbFreePageCount; - + static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) { size_t pgCount = (_nSize + 4095) / 4096; #ifdef _MSC_VER @@ -466,16 +466,16 @@ static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) { if (p == nullptr) continue; if (DoCas(&lbFreePtrs[lbHash][i], (void*)nullptr, p) == p) { - size_t realPageCount = TLargeBlk::As(p)->Pages; + size_t realPageCount = TLargeBlk::As(p)->Pages; if (realPageCount == pgCount) { AtomicAdd(lbFreePageCount, -pgCount); - TLargeBlk::As(p)->Mark(ELarge::Alloc); + TLargeBlk::As(p)->Mark(ELarge::Alloc); return p; } else { if (DoCas(&lbFreePtrs[lbHash][i], p, (void*)nullptr) != (void*)nullptr) { // block was freed while we were busy AtomicAdd(lbFreePageCount, -realPageCount); - LargeBlockUnmap(p, realPageCount); + LargeBlockUnmap(p, realPageCount); --i; } } @@ -484,8 +484,8 @@ static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) { char* pRes = AllocWithMMap((pgCount + 1) * 4096ll, MM_HUGE); #endif pRes += 4096ll; - TLargeBlk::As(pRes)->SetSize(_nSize, pgCount); - TLargeBlk::As(pRes)->Mark(ELarge::Alloc); + TLargeBlk::As(pRes)->SetSize(_nSize, pgCount); + TLargeBlk::As(pRes)->Mark(ELarge::Alloc); return pRes; } @@ -498,9 +498,9 @@ static void FreeAllLargeBlockMem() { if (p == nullptr) continue; if (DoCas(&lbFreePtr[i], (void*)nullptr, p) == p) { - int pgCount = TLargeBlk::As(p)->Pages; + int pgCount = TLargeBlk::As(p)->Pages; AtomicAdd(lbFreePageCount, -pgCount); - LargeBlockUnmap(p, pgCount); + LargeBlockUnmap(p, pgCount); } } } @@ -513,9 +513,9 @@ static void LargeBlockFree(void* p, ELFAllocCounter counter) { #ifdef _MSC_VER VirtualFree((char*)p - 4096ll, 0, MEM_RELEASE); #else - size_t pgCount = TLargeBlk::As(p)->Pages; + size_t pgCount = TLargeBlk::As(p)->Pages; - TLargeBlk::As(p)->Mark(ELarge::Free); + TLargeBlk::As(p)->Mark(ELarge::Free); IncrementCounter(counter, pgCount * 4096ll); IncrementCounter(CT_SYSTEM_FREE, 4096ll); @@ -531,7 +531,7 @@ static void LargeBlockFree(void* p, ELFAllocCounter counter) { } } - LargeBlockUnmap(p, pgCount); + LargeBlockUnmap(p, pgCount); #endif } @@ -1644,23 +1644,23 @@ static Y_FORCE_INLINE void LFFree(void* p) { } } -static size_t LFGetSize(const void* p) { +static size_t LFGetSize(const void* p) { #if defined(LFALLOC_DBG) if (p == nullptr) return 0; - return GetAllocHeader(const_cast<void*>(p))->Size; + return GetAllocHeader(const_cast<void*>(p))->Size; #endif - uintptr_t chkOffset = ((const char*)p - ALLOC_START); + uintptr_t chkOffset = ((const char*)p - ALLOC_START); if (chkOffset >= N_MAX_WORKSET_SIZE) { if (p == nullptr) return 0; - return TLargeBlk::As(p)->Pages * 4096ll; + return TLargeBlk::As(p)->Pages * 4096ll; } - uintptr_t chunk = ((const char*)p - ALLOC_START) / N_CHUNK_SIZE; + uintptr_t chunk = ((const char*)p - ALLOC_START) / N_CHUNK_SIZE; ptrdiff_t nSizeIdx = chunkSizeIdx[chunk]; if (nSizeIdx <= 0) - return TLargeBlk::As(p)->Pages * 4096ll; + return TLargeBlk::As(p)->Pages * 4096ll; return nSizeIdxToSize[nSizeIdx]; } diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp index de8810d02e..64a38619c2 100644 --- a/library/cpp/messagebus/acceptor.cpp +++ b/library/cpp/messagebus/acceptor.cpp @@ -19,7 +19,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c : TActor<TAcceptor>(session->Queue->WorkQueue.Get()) , AcceptorId(acceptorId) , Session(session) - , GranStatus(session->Config.Secret.StatusFlushPeriod) + , GranStatus(session->Config.Secret.StatusFlushPeriod) { SetNonBlock(socket, true); @@ -30,7 +30,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c Stats.Fd = socket; Stats.ListenAddr = addr; - SendStatus(TInstant::Now()); + SendStatus(TInstant::Now()); } void TAcceptor::Act(TDefaultTag) { @@ -40,8 +40,8 @@ void TAcceptor::Act(TDefaultTag) { return; } - TInstant now = TInstant::Now(); - + TInstant now = TInstant::Now(); + if (state == SS_SHUTDOWN_COMMAND) { if (!!Channel) { Channel->Unregister(); @@ -49,7 +49,7 @@ void TAcceptor::Act(TDefaultTag) { Stats.Fd = INVALID_SOCKET; } - SendStatus(now); + SendStatus(now); Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats); Stats.ResetIncremental(); @@ -96,7 +96,7 @@ void TAcceptor::Act(TDefaultTag) { Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept); - Stats.LastAcceptSuccessInstant = now; + Stats.LastAcceptSuccessInstant = now; ++Stats.AcceptSuccessCount; } @@ -105,11 +105,11 @@ void TAcceptor::Act(TDefaultTag) { Channel->EnableRead(); - SendStatus(now); + SendStatus(now); } -void TAcceptor::SendStatus(TInstant now) { - GranStatus.Listen.Update(Stats, now); +void TAcceptor::SendStatus(TInstant now) { + GranStatus.Listen.Update(Stats, now); } void TAcceptor::HandleEvent(SOCKET socket, void* cookie) { diff --git a/library/cpp/messagebus/acceptor.h b/library/cpp/messagebus/acceptor.h index 8ec2229d63..57cb010bf2 100644 --- a/library/cpp/messagebus/acceptor.h +++ b/library/cpp/messagebus/acceptor.h @@ -55,6 +55,6 @@ namespace NBus { TGranStatus GranStatus; }; - + } } diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h index d26a546296..40fa536b82 100644 --- a/library/cpp/messagebus/actor/queue_for_actor.h +++ b/library/cpp/messagebus/actor/queue_for_actor.h @@ -60,15 +60,15 @@ namespace NActor { temp.Shrink(); } } - + template <typename TFunc> void DequeueAllLikelyEmpty(const TFunc& func) { if (Y_LIKELY(IsEmpty())) { return; } - + DequeueAll(func); - } + } }; } diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h index 407703d702..675d92f5b0 100644 --- a/library/cpp/messagebus/actor/temp_tls_vector.h +++ b/library/cpp/messagebus/actor/temp_tls_vector.h @@ -23,18 +23,18 @@ public: } ~TTempTlsVector() { - Clear(); - } - - void Clear() { + Clear(); + } + + void Clear() { Vector->clear(); } - + size_t Capacity() const noexcept { - return Vector->capacity(); - } - - void Shrink() { - Vector->shrink_to_fit(); - } + return Vector->capacity(); + } + + void Shrink() { + Vector->shrink_to_fit(); + } }; diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp index c1cb356840..962ac538e2 100644 --- a/library/cpp/messagebus/config/netaddr.cpp +++ b/library/cpp/messagebus/config/netaddr.cpp @@ -129,7 +129,7 @@ namespace NBus { ythrow TNetAddr::TError() << "cannot resolve " << host << ":" << port << " into " << Describe(requireVersion); } } - + TNetAddr::TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_ANY*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) : Ptr(MakeAddress(na, requireVersion, preferVersion)) { diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h index ccb4b42810..b79c0cc355 100644 --- a/library/cpp/messagebus/config/netaddr.h +++ b/library/cpp/messagebus/config/netaddr.h @@ -36,14 +36,14 @@ namespace NBus { public: class TError: public yexception { }; - + TNetAddr(); TNetAddr(TAutoPtr<IRemoteAddr> addr); TNetAddr(const char* hostPort, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(TStringBuf host, int port, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(const TNetworkAddress& na, const TAddrInfo& ai); - + bool operator==(const TNetAddr&) const; bool operator!=(const TNetAddr& other) const { return !(*this == other); diff --git a/library/cpp/messagebus/config/session_config.cpp b/library/cpp/messagebus/config/session_config.cpp index 17157b3dfa..fbbbb106c9 100644 --- a/library/cpp/messagebus/config/session_config.cpp +++ b/library/cpp/messagebus/config/session_config.cpp @@ -120,7 +120,7 @@ void TBusSessionConfig::ConfigureLastGetopt(NLastGetopt::TOpts& opts, opts.AddLongOption(prefix + "max-message-size") .RequiredArgument("BYTES") .DefaultValue(ToString(MaxMessageSize)) - .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix); + .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix); opts.AddLongOption(prefix + "socket-recv-buffer-size") .RequiredArgument("BYTES") .DefaultValue(ToString(SocketRecvBufferSize)) diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index fd2e726d0b..f685135bed 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -78,7 +78,7 @@ public: const char* Name; - TAtomic RunningState; + TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; TData Data; @@ -143,7 +143,7 @@ void TEventLoop::Stop() { } bool TEventLoop::IsRunning() { - return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; + return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; } TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { @@ -277,7 +277,7 @@ TEventLoop::TImpl::TImpl(const char* name) } void TEventLoop::TImpl::Run() { - bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); + bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); if (!!Name) { @@ -320,21 +320,21 @@ void TEventLoop::TImpl::Run() { Data.clear(); } - res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); + res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); Y_VERIFY(res); - + StoppedEvent.Signal(); } void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); - if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { - Wakeup(); + if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { + Wakeup(); - StoppedEvent.WaitI(); - } + StoppedEvent.WaitI(); + } } TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h index 8b04aca597..36ecfebc93 100644 --- a/library/cpp/messagebus/misc/granup.h +++ b/library/cpp/messagebus/misc/granup.h @@ -1,50 +1,50 @@ -#pragma once - +#pragma once + #include <util/datetime/base.h> #include <util/system/guard.h> -#include <util/system/mutex.h> -#include <util/system/spinlock.h> - -namespace NBus { - template <typename TItem, typename TLocker = TSpinLock> - class TGranUp { - public: - TGranUp(TDuration gran) - : Gran(gran) +#include <util/system/mutex.h> +#include <util/system/spinlock.h> + +namespace NBus { + template <typename TItem, typename TLocker = TSpinLock> + class TGranUp { + public: + TGranUp(TDuration gran) + : Gran(gran) , Next(TInstant::MicroSeconds(0)) { } - + template <typename TFunctor> void Update(TFunctor functor, TInstant now, bool force = false) { if (force || now > Next) - Set(functor(), now); - } - + Set(functor(), now); + } + void Update(const TItem& item, TInstant now, bool force = false) { if (force || now > Next) - Set(item, now); - } - + Set(item, now); + } + TItem Get() const noexcept { TGuard<TLocker> guard(Lock); - - return Item; - } - - protected: + + return Item; + } + + protected: void Set(const TItem& item, TInstant now) { TGuard<TLocker> guard(Lock); - - Item = item; - - Next = now + Gran; - } - - private: - const TDuration Gran; + + Item = item; + + Next = now + Gran; + } + + private: + const TDuration Gran; TLocker Lock; TItem Item; TInstant Next; - }; -} + }; +} diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h index 954cf0f0d7..190547fa54 100644 --- a/library/cpp/messagebus/misc/tokenquota.h +++ b/library/cpp/messagebus/misc/tokenquota.h @@ -1,83 +1,83 @@ -#pragma once - -#include <util/system/atomic.h> - -namespace NBus { - /* Consumer and feeder quota model impl. - - Consumer thread only calls: - Acquire(), fetches tokens for usage from bucket; - Consume(), eats given amount of tokens, must not - be greater than Value() items; - - Other threads (feeders) calls: - Return(), put used tokens back to bucket; - */ - - class TTokenQuota { - public: - TTokenQuota(bool enabled, size_t tokens, size_t wake) - : Enabled(tokens > 0 ? enabled : false) - , Acquired(0) - , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) - , Tokens_(tokens) +#pragma once + +#include <util/system/atomic.h> + +namespace NBus { + /* Consumer and feeder quota model impl. + + Consumer thread only calls: + Acquire(), fetches tokens for usage from bucket; + Consume(), eats given amount of tokens, must not + be greater than Value() items; + + Other threads (feeders) calls: + Return(), put used tokens back to bucket; + */ + + class TTokenQuota { + public: + TTokenQuota(bool enabled, size_t tokens, size_t wake) + : Enabled(tokens > 0 ? enabled : false) + , Acquired(0) + , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) + , Tokens_(tokens) { Y_UNUSED(padd_); } - + bool Acquire(TAtomic level = 1, bool force = false) { level = Max(TAtomicBase(level), TAtomicBase(1)); - + if (Enabled && (Acquired < level || force)) { Acquired += AtomicSwap(&Tokens_, 0); - } - - return !Enabled || Acquired >= level; - } - + } + + return !Enabled || Acquired >= level; + } + void Consume(size_t items) { if (Enabled) { Y_ASSERT(Acquired >= TAtomicBase(items)); - - Acquired -= items; - } - } - + + Acquired -= items; + } + } + bool Return(size_t items_) noexcept { if (!Enabled || items_ == 0) - return false; - - const TAtomic items = items_; - const TAtomic value = AtomicAdd(Tokens_, items); - - return (value - items < WakeLev && value >= WakeLev); - } - + return false; + + const TAtomic items = items_; + const TAtomic value = AtomicAdd(Tokens_, items); + + return (value - items < WakeLev && value >= WakeLev); + } + bool IsEnabled() const noexcept { - return Enabled; - } - + return Enabled; + } + bool IsAboveWake() const noexcept { - return !Enabled || (WakeLev <= AtomicGet(Tokens_)); - } - + return !Enabled || (WakeLev <= AtomicGet(Tokens_)); + } + size_t Tokens() const noexcept { - return Acquired + AtomicGet(Tokens_); - } - + return Acquired + AtomicGet(Tokens_); + } + size_t Check(const TAtomic level) const noexcept { - return !Enabled || level <= Acquired; - } - - private: + return !Enabled || level <= Acquired; + } + + private: bool Enabled; TAtomicBase Acquired; - const TAtomicBase WakeLev; + const TAtomicBase WakeLev; TAtomic Tokens_; - - /* This padd requires for align Tokens_ member on its own - CPU cacheline. */ - + + /* This padd requires for align Tokens_ member on its own + CPU cacheline. */ + ui64 padd_; - }; -} + }; +} diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 730fc0f554..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -48,11 +48,11 @@ namespace NBus { const TInstant now = TInstant::Now(); WriterFillStatus(); - + GranStatus.Writer.Update(WriterData.Status, now, true); GranStatus.Reader.Update(ReaderData.Status, now, true); } - + TRemoteConnection::~TRemoteConnection() { Y_VERIFY(ReplyQueue.IsEmpty()); } @@ -73,7 +73,7 @@ namespace NBus { bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; - + return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; } @@ -83,13 +83,13 @@ namespace NBus { Y_VERIFY(State == WRITER_FILLING, "state must be initial"); Channel = channel; } - + void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { Y_VERIFY(!Channel, "must not have channel"); Y_VERIFY(Buffer.Empty(), "buffer must be empty"); Channel = channel; } - + void TRemoteConnection::TWriterData::DropChannel() { if (!!Channel) { Channel->Unregister(); @@ -184,7 +184,7 @@ namespace NBus { ReaderData.Status.Fd = INVALID_SOCKET; return; } - + ReaderData.DropChannel(); ReaderData.Status.Fd = readSocket.Socket; @@ -232,10 +232,10 @@ namespace NBus { ReaderData.Status.Acts += 1; ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); - + if (AtomicGet(ReaderData.Down)) { ReaderData.DropChannel(); - + ReaderProcessStatusDown(); ReaderData.ShutdownComplete.Signal(); @@ -262,7 +262,7 @@ namespace NBus { } ReaderFlushMessages(); - } + } ReaderSendStatus(now); } @@ -275,109 +275,109 @@ namespace NBus { else if (!QuotaBytes.Acquire(bytes)) wakeFlags |= WAKE_QUOTA_BYTES; - + if (wakeFlags) { ReaderData.Status.QuotaExhausted++; - + WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); } - + return wakeFlags == 0; } - + void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { QuotaMsg.Consume(msg); QuotaBytes.Consume(bytes); } - + void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes)) ReadQuotaWakeup(); } - + void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) WriterGetWakeQueue()->EnqueueAndSchedule(0x0); } - + bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { bool rMsg = QuotaMsg.Return(items); bool rBytes = QuotaBytes.Return(bytes); - + return rMsg || rBytes; } - + void TRemoteConnection::ReadQuotaWakeup() { const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); - + if (mask && mask == WriterData.AwakeFlags) { WriterData.Status.ReaderWakeups++; WriterData.AwakeFlags = 0; - + ScheduleRead(); } } - + ui32 TRemoteConnection::WriteWakeFlags() const { ui32 awakeFlags = 0; - + if (QuotaMsg.IsAboveWake()) awakeFlags |= WAKE_QUOTA_MSG; - + if (QuotaBytes.IsAboveWake()) awakeFlags |= WAKE_QUOTA_BYTES; - + return awakeFlags; } - + bool TRemoteConnection::ReaderProcessBuffer() { TInstant now = TInstant::Now(); - + for (;;) { if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { break; } - + TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); - + if (header.Size < sizeof(TBusHeader)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (header.Size > Config.MaxMessageSize) { LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); return false; } - + if (!ReaderData.HasBytesInBuf(header.Size)) { if (ReaderData.Offset == 0) { ReaderData.Buffer.Reserve(header.Size); } break; } - + if (!QuotaAcquire(1, header.Size)) return false; - + if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { return false; } - + ReaderData.Offset += header.Size; } - + ReaderData.Buffer.ChopHead(ReaderData.Offset); ReaderData.Offset = 0; @@ -408,7 +408,7 @@ namespace NBus { } Y_ASSERT(ReaderData.Buffer.Avail() > 0); - + ssize_t bytes; { TWhatThreadDoesPushPop pp("recv syscall"); @@ -454,7 +454,7 @@ namespace NBus { message != replyQueueTemp.rend(); ++message) { messages.push_back(message->MessagePtr.Release()); } - + WriterErrorMessages(messages, reason); replyQueueTemp.clear(); @@ -535,10 +535,10 @@ namespace NBus { ClearBeforeSendQueue(reasonForQueues); WriterGetReconnectQueue()->Clear(); WriterGetWakeQueue()->Clear(); - + TMessagesPtrs cleared; ClearOutgoingQueue(cleared, false); - + if (!Session->IsSource_) { for (auto& i : cleared) { TBusMessagePtrAndHeader h(i); @@ -548,10 +548,10 @@ namespace NBus { // and this part is not batch } } - + WriterErrorMessages(cleared, reason); } - + void TRemoteConnection::BeforeTryWrite() { } @@ -638,7 +638,7 @@ namespace NBus { WriterData.Status.Incremental.NetworkOps += 1; WriterData.Buffer.LeftProceed(bytes); - } + } WriterData.Buffer.Clear(); if (WriterData.Buffer.Capacity() > MaxBufferSize) { @@ -654,12 +654,12 @@ namespace NBus { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); } else { ScheduleShutdown(status); - } + } } void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { ShutdownReason = status; - + AtomicSet(ReaderData.Down, 1); ScheduleRead(); @@ -856,7 +856,7 @@ namespace NBus { } TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; - + for (;;) { THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); if (!writeMessage) { @@ -944,12 +944,12 @@ namespace NBus { WriterErrorMessage(h.MessagePtr.Release(), status); } } - + void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { TBusMessage* released = m.Release(); WriterErrorMessages(MakeArrayRef(&released, 1), status); } - + void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); @@ -958,17 +958,17 @@ namespace NBus { Session->InvokeOnError(m, status); } } - + void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { Y_VERIFY(Session->IsSource_, "state check"); TClientConnectionEvent event(type, ConnectionId, PeerAddr); TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); session->ClientHandler->OnClientConnectionEvent(event); } - + bool TRemoteConnection::IsAlive() const { return !AtomicGet(WriterData.Down); } - + } } diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index 5141a8ea9f..4538947368 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -13,8 +13,8 @@ #include "storage.h" #include "vector_swaps.h" #include "ybus.h" -#include "misc/granup.h" -#include "misc/tokenquota.h" +#include "misc/granup.h" +#include "misc/tokenquota.h" #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/executor.h> @@ -49,7 +49,7 @@ namespace NBus { struct TWriterToReaderSocketMessage { TSocket Socket; ui32 SocketVersion; - + TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion) : Socket(socket) , SocketVersion(socketVersion) @@ -154,13 +154,13 @@ namespace NBus { virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0; bool MessageRead(TArrayRef<const char> dataRef, TInstant now); virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0; - + void CallSerialize(TBusMessage* msg, TBuffer& buffer) const; void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const; TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const; - + void ResetOneWayFlag(TArrayRef<TBusMessage*>); - + inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() { return this; } @@ -269,7 +269,7 @@ namespace NBus { TGranUp<TRemoteConnectionWriterStatus> Writer; TGranUp<TRemoteConnectionReaderStatus> Reader; }; - + TWriterData WriterData; TReaderData ReaderData; TGranStatus GranStatus; @@ -280,15 +280,15 @@ namespace NBus { // client connection only TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue; - + EMessageStatus ShutdownReason; }; inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept { return PeerAddr; } - + typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; - + } } diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp index 05ae84791c..2c48b2a287 100644 --- a/library/cpp/messagebus/remote_connection_status.cpp +++ b/library/cpp/messagebus/remote_connection_status.cpp @@ -180,15 +180,15 @@ TString TRemoteConnectionStatus::PrintToString() const { p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls); } - p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6)); - + p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6)); + if (Server) { - p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6)); - p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6)); - p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6)); - p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6)); - } else { - p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6)); + p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6)); + p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6)); + p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6)); + p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6)); + } else { + p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6)); } p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false)); diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp index 34dd2153e2..6abbf88a60 100644 --- a/library/cpp/messagebus/remote_server_session.cpp +++ b/library/cpp/messagebus/remote_server_session.cpp @@ -24,9 +24,9 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue, { if (config.PerConnectionMaxInFlightBySize > 0) { if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize) - ythrow yexception() - << "too low PerConnectionMaxInFlightBySize value"; - } + ythrow yexception() + << "too low PerConnectionMaxInFlightBySize value"; + } } namespace NBus { @@ -87,7 +87,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) { if (Y_UNLIKELY(AtomicGet(Down))) { - ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get()); + ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get()); InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN); } else { TWhatThreadDoesPushPop pp("OnMessage"); @@ -167,19 +167,19 @@ void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtr void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) { Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK)); - request->LocalFlags &= ~MESSAGE_IN_WORK; + request->LocalFlags &= ~MESSAGE_IN_WORK; + + const size_t size = request->GetHeader()->Size; - const size_t size = request->GetHeader()->Size; - - con.QuotaReturnAside(1, size); - ServerOwnedMessages.ReleaseMultiple(1, size); + con.QuotaReturnAside(1, size); + ServerOwnedMessages.ReleaseMultiple(1, size); } void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) { - ident.SetInWork(false); - ident.Connection->QuotaReturnAside(1, ident.Size); + ident.SetInWork(false); + ident.Connection->QuotaReturnAside(1, ident.Size); - ServerOwnedMessages.ReleaseMultiple(1, ident.Size); + ServerOwnedMessages.ReleaseMultiple(1, ident.Size); } void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) { diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index 7adaa1ae6d..ddf9f360c4 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -389,14 +389,14 @@ void TBusSessionImpl::StatusUpdateCachedDump() { for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin(); acceptor != acceptors.end(); ++acceptor) { - const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get(); - - acceptorStatusSummary += status; - + const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get(); + + acceptorStatusSummary += status; + if (acceptor != acceptors.begin()) { ss << "\n"; } - ss << status.PrintToString(); + ss << status.PrintToString(); } r.Acceptors = ss.Str(); @@ -410,19 +410,19 @@ void TBusSessionImpl::StatusUpdateCachedDump() { if (connection != connections.begin()) { ss << "\n"; } - + TRemoteConnectionStatus status; status.Server = !IsSource_; - status.ReaderStatus = (*connection)->GranStatus.Reader.Get(); - status.WriterStatus = (*connection)->GranStatus.Writer.Get(); - + status.ReaderStatus = (*connection)->GranStatus.Reader.Get(); + status.WriterStatus = (*connection)->GranStatus.Writer.Get(); + ss << status.PrintToString(); - - r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus; - r.ConnectionStatusSummary.WriterStatus += status.WriterStatus; + + r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus; + r.ConnectionStatusSummary.WriterStatus += status.WriterStatus; } - r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString(); + r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString(); r.Connections = ss.Str(); } diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index c11d447224..040f9b7702 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Sync.WaitForAndIncrement(3); } - + struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; TMutex OneLock; - - TOnMessageContext OneMessage; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - - config.PerConnectionMaxInFlight = 1; - config.PerConnectionMaxInFlightBySize = 1500; - config.MaxMessageSize = 1024; - - return config; - } - - TServerForQuotaWake() - : TExampleServer("TServerForQuotaWake", Config()) + + TOnMessageContext OneMessage; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + + config.PerConnectionMaxInFlight = 1; + config.PerConnectionMaxInFlightBySize = 1500; + config.MaxMessageSize = 1024; + + return config; + } + + TServerForQuotaWake() + : TExampleServer("TServerForQuotaWake", Config()) { } - + ~TServerForQuotaWake() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& req) override { - if (!GoOn.Wait(0)) { + if (!GoOn.Wait(0)) { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!OneMessage); - - OneMessage.Swap(req); - } else - TExampleServer::OnMessage(req); - } - - void WakeOne() { + + UNIT_ASSERT(!OneMessage); + + OneMessage.Swap(req); + } else + TExampleServer::OnMessage(req); + } + + void WakeOne() { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!!OneMessage); - - TExampleServer::OnMessage(OneMessage); - - TOnMessageContext().Swap(OneMessage); - } - }; - + + UNIT_ASSERT(!!OneMessage); + + TExampleServer::OnMessage(OneMessage); + + TOnMessageContext().Swap(OneMessage); + } + }; + Y_UNIT_TEST(WakeReaderOnQuota) { - const size_t test_msg_count = 64; - - TBusClientSessionConfig clientConfig; - - clientConfig.MaxInFlight = test_msg_count; - - TExampleClient client(clientConfig); - TServerForQuotaWake server; - TInstant start; - - client.MessageCount = test_msg_count; - - const NBus::TNetAddr addr = server.GetActualListenAddr(); - - for (unsigned count = 0;;) { - UNIT_ASSERT(count <= test_msg_count); - - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); - - if (status == MESSAGE_OK) { - count++; - - } else if (status == MESSAGE_BUSY) { + const size_t test_msg_count = 64; + + TBusClientSessionConfig clientConfig; + + clientConfig.MaxInFlight = test_msg_count; + + TExampleClient client(clientConfig); + TServerForQuotaWake server; + TInstant start; + + client.MessageCount = test_msg_count; + + const NBus::TNetAddr addr = server.GetActualListenAddr(); + + for (unsigned count = 0;;) { + UNIT_ASSERT(count <= test_msg_count); + + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); + + if (status == MESSAGE_OK) { + count++; + + } else if (status == MESSAGE_BUSY) { if (count == test_msg_count) { - TInstant now = TInstant::Now(); - + TInstant now = TInstant::Now(); + if (start.GetValue() == 0) { - start = now; - + start = now; + // TODO: properly check that server is blocked } else if (start + TDuration::MilliSeconds(100) < now) { - break; - } - } - - Sleep(TDuration::MilliSeconds(10)); - - } else - UNIT_ASSERT(false); - } - - server.GoOn.Signal(); - server.WakeOne(); - - client.WaitReplies(); - - server.WaitForOnMessageCount(test_msg_count); + break; + } + } + + Sleep(TDuration::MilliSeconds(10)); + + } else + UNIT_ASSERT(false); + } + + server.GoOn.Signal(); + server.WakeOne(); + + client.WaitReplies(); + + server.WaitForOnMessageCount(test_msg_count); }; Y_UNIT_TEST(TestConnectionAttempts) { diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp index e501cbf4a9..62ec241d85 100644 --- a/library/cpp/messagebus/www/www.cpp +++ b/library/cpp/messagebus/www/www.cpp @@ -200,12 +200,12 @@ struct TBusWww::TImpl { Queues.Add(s->GetQueue()); } - void RegisterQueue(TBusMessageQueuePtr q) { + void RegisterQueue(TBusMessageQueuePtr q) { Y_VERIFY(!!q); - TGuard<TMutex> g(Mutex); - Queues.Add(q); - } - + TGuard<TMutex> g(Mutex); + Queues.Add(q); + } + void RegisterModule(TBusModule* module) { Y_VERIFY(!!module); TGuard<TMutex> g(Mutex); @@ -824,10 +824,10 @@ void TBusWww::RegisterServerSession(TBusServerSessionPtr s) { Impl->RegisterServerSession(s); } -void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { - Impl->RegisterQueue(q); -} - +void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { + Impl->RegisterQueue(q); +} + void TBusWww::RegisterModule(TBusModule* module) { Impl->RegisterModule(module); } diff --git a/library/cpp/monlib/messagebus/mon_service_messagebus.h b/library/cpp/monlib/messagebus/mon_service_messagebus.h index ec1890e915..fe791e8a9b 100644 --- a/library/cpp/monlib/messagebus/mon_service_messagebus.h +++ b/library/cpp/monlib/messagebus/mon_service_messagebus.h @@ -42,5 +42,5 @@ namespace NMonitoring { RegisterBusNgMonPage()->RegisterModule(module); } }; - + } |