aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorsingle <single@yandex-team.ru>2022-02-10 16:50:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:30 +0300
commitf7835298a8840c8e5d98715bf23efa9c7e03b9c4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp
parent8ae96df130bbede609c3504aa9af1bc6ff5361b3 (diff)
downloadydb-f7835298a8840c8e5d98715bf23efa9c7e03b9c4.tar.gz
Restoring authorship annotation for <single@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor.cpp20
-rw-r--r--library/cpp/actors/core/actor.h24
-rw-r--r--library/cpp/actors/core/event.h10
-rw-r--r--library/cpp/actors/core/events.h36
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp8
-rw-r--r--library/cpp/actors/core/executor_thread.cpp10
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp26
-rw-r--r--library/cpp/actors/core/log.h12
-rw-r--r--library/cpp/actors/core/log_iface.h56
-rw-r--r--library/cpp/actors/core/log_settings.cpp6
-rw-r--r--library/cpp/actors/core/log_settings.h22
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.cpp162
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.h20
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp64
-rw-r--r--library/cpp/actors/testlib/test_runtime.h26
-rw-r--r--library/cpp/actors/testlib/ya.make36
-rw-r--r--library/cpp/lfalloc/lf_allocX64.h120
-rw-r--r--library/cpp/messagebus/acceptor.cpp18
-rw-r--r--library/cpp/messagebus/acceptor.h2
-rw-r--r--library/cpp/messagebus/actor/queue_for_actor.h6
-rw-r--r--library/cpp/messagebus/actor/temp_tls_vector.h22
-rw-r--r--library/cpp/messagebus/config/netaddr.cpp2
-rw-r--r--library/cpp/messagebus/config/netaddr.h4
-rw-r--r--library/cpp/messagebus/config/session_config.cpp2
-rw-r--r--library/cpp/messagebus/event_loop.cpp18
-rw-r--r--library/cpp/messagebus/misc/granup.h66
-rw-r--r--library/cpp/messagebus/misc/tokenquota.h126
-rw-r--r--library/cpp/messagebus/remote_connection.cpp100
-rw-r--r--library/cpp/messagebus/remote_connection.h20
-rw-r--r--library/cpp/messagebus/remote_connection_status.cpp16
-rw-r--r--library/cpp/messagebus/remote_server_session.cpp24
-rw-r--r--library/cpp/messagebus/session_impl.cpp26
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp166
-rw-r--r--library/cpp/messagebus/www/www.cpp18
-rw-r--r--library/cpp/monlib/messagebus/mon_service_messagebus.h2
36 files changed, 649 insertions, 649 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 243a3e1a86..6f9ba6a42b 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -16,15 +16,15 @@ namespace NActors {
}
void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
- // fallback to legacy method, do not use it anymore
- if (auto eh = AfterRegister(SelfId(), owner))
- sys->Send(eh);
- }
-
- void IActor::Describe(IOutputStream &out) const noexcept {
- SelfActorId.Out(out);
- }
-
+ // fallback to legacy method, do not use it anymore
+ if (auto eh = AfterRegister(SelfId(), owner))
+ sys->Send(eh);
+ }
+
+ void IActor::Describe(IOutputStream &out) const noexcept {
+ SelfActorId.Out(out);
+ }
+
bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
}
@@ -123,7 +123,7 @@ namespace NActors {
TlsActivationContext->ExecutorThread.Schedule(deadline, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
- void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
+ void IActor::Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie) const noexcept {
TlsActivationContext->ExecutorThread.Schedule(delta, new IEventHandle(SelfActorId, TActorId(), ev), cookie);
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 7a414bad1c..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -6,7 +6,7 @@
#include <library/cpp/actors/util/local_process_key.h>
namespace NActors {
- class TActorSystem;
+ class TActorSystem;
class TMailboxTable;
struct TMailboxHeader;
@@ -169,11 +169,11 @@ namespace NActors {
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
};
- class IActor;
-
- class IActorOps : TNonCopyable {
+ class IActor;
+
+ class IActorOps : TNonCopyable {
public:
- virtual void Describe(IOutputStream&) const noexcept = 0;
+ virtual void Describe(IOutputStream&) const noexcept = 0;
virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
/**
@@ -205,12 +205,12 @@ namespace NActors {
virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
- };
-
+ };
+
class TDecorator;
- class IActor : protected IActorOps {
- public:
+ class IActor : protected IActorOps {
+ public:
typedef void (IActor::*TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
private:
@@ -324,7 +324,7 @@ namespace NActors {
}
virtual void Registered(TActorSystem* sys, const TActorId& owner);
-
+
virtual TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) {
Y_UNUSED(self);
Y_UNUSED(parentId);
@@ -349,7 +349,7 @@ namespace NActors {
}
protected:
- void Describe(IOutputStream&) const noexcept override;
+ void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
template <typename TEvent>
bool Send(const TActorId& recipient, THolder<TEvent> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
@@ -363,7 +363,7 @@ namespace NActors {
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
- void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
+ void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
// register new actor in ActorSystem on new fresh mailbox.
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 745bbd28f4..6ff02aaf94 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -59,11 +59,11 @@ namespace NActors {
public:
template <typename TEv>
inline TEv* CastAsLocal() const noexcept {
- auto fits = GetTypeRewrite() == TEv::EventType;
-
- return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
- }
-
+ auto fits = GetTypeRewrite() == TEv::EventType;
+
+ return fits ? static_cast<TEv*>(Event.Get()) : nullptr;
+ }
+
template <typename TEventType>
TEventType* Get() {
if (Type != TEventType::EventType)
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 984cb51523..702cf50fad 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -85,23 +85,23 @@ namespace NActors {
Unsubscribe, // generic unsubscribe from something
Delivered, // event delivered
Undelivered, // event undelivered
- Poison, // request actor to shutdown
+ Poison, // request actor to shutdown
Completed, // generic async job result event
- PoisonTaken, // generic Poison taken (reply to PoisonPill event, i.e. died completely)
+ PoisonTaken, // generic Poison taken (reply to PoisonPill event, i.e. died completely)
FlushLog,
CallbackCompletion,
CallbackException,
- Gone, // Generic notification of actor death
+ Gone, // Generic notification of actor death
TrackActor,
UntrackActor,
InvokeResult,
CoroTimeout,
InvokeQuery,
- End,
-
+ End,
+
// Compatibility section
- PoisonPill = Poison,
- ActorDied = Gone,
+ PoisonPill = Poison,
+ ActorDied = Gone,
};
static_assert(End < EventSpaceEnd(ES_SYSTEM), "expect End < EventSpaceEnd(ES_SYSTEM)");
@@ -111,16 +111,16 @@ namespace NActors {
DEFINE_SIMPLE_LOCAL_EVENT(TEvBootstrap, "System: TEvBootstrap")
};
- struct TEvPoison : public TEventBase<TEvPoison, TSystem::Poison> {
- DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPoison, "System: TEvPoison")
+ struct TEvPoison : public TEventBase<TEvPoison, TSystem::Poison> {
+ DEFINE_SIMPLE_NONLOCAL_EVENT(TEvPoison, "System: TEvPoison")
};
struct TEvWakeup: public TEventBase<TEvWakeup, TSystem::Wakeup> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvWakeup, "System: TEvWakeup")
-
- TEvWakeup(ui64 tag = 0) : Tag(tag) { }
-
- const ui64 Tag = 0;
+
+ TEvWakeup(ui64 tag = 0) : Tag(tag) { }
+
+ const ui64 Tag = 0;
};
struct TEvSubscribe: public TEventBase<TEvSubscribe, TSystem::Subscribe> {
@@ -205,14 +205,14 @@ namespace NActors {
}
};
- struct TEvGone: public TEventBase<TEvGone, TSystem::Gone> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvGone, "System: TEvGone")
+ struct TEvGone: public TEventBase<TEvGone, TSystem::Gone> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvGone, "System: TEvGone")
};
-
+
struct TEvInvokeResult;
- using TEvPoisonPill = TEvPoison; // Legacy name, deprecated
- using TEvActorDied = TEvGone;
+ using TEvPoisonPill = TEvPoison; // Legacy name, deprecated
+ using TEvActorDied = TEvGone;
};
}
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index 1fb66adc61..c3b9999168 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -8,8 +8,8 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) {
- actor->SelfActorId = self;
- actor->Registered(sys, owner);
+ actor->SelfActorId = self;
+ actor->Registered(sys, owner);
}
TExecutorPoolBaseMailboxed::TExecutorPoolBaseMailboxed(ui32 poolId, ui32 maxActivityType)
@@ -98,7 +98,7 @@ namespace NActors {
// do init
const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
- DoActorInit(ActorSystem, actor, actorId, parentId);
+ DoActorInit(ActorSystem, actor, actorId, parentId);
// Once we unlock the mailbox the actor starts running and we cannot use the pointer any more
actor = nullptr;
@@ -145,7 +145,7 @@ namespace NActors {
mailbox->AttachActor(localActorId, actor);
const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
- DoActorInit(ActorSystem, actor, actorId, parentId);
+ DoActorInit(ActorSystem, actor, actorId, parentId);
NHPTimer::STime elapsed = GetCycleCountFast() - hpstart;
if (elapsed > 1000000) {
LWPROBE(SlowRegisterAdd, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0);
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 7a34944ae2..446b651efd 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -67,10 +67,10 @@ namespace NActors {
DyingActors.push_back(THolder(actor));
}
- void TExecutorThread::DropUnregistered() {
- DyingActors.clear(); // here is actual destruction of actors
- }
-
+ void TExecutorThread::DropUnregistered() {
+ DyingActors.clear(); // here is actual destruction of actors
+ }
+
void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
@@ -181,7 +181,7 @@ namespace NActors {
size_t dyingActorsCnt = DyingActors.size();
Ctx.UpdateActorsStats(dyingActorsCnt);
if (dyingActorsCnt) {
- DropUnregistered();
+ DropUnregistered();
actor = nullptr;
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index a1ef9786f6..9d3c573f0d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -43,7 +43,7 @@ namespace NActors {
const TActorId& parentId = TActorId());
TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId());
void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId);
- void DropUnregistered();
+ void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index ca0c6e60ea..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -251,11 +251,11 @@ namespace NActors {
const auto prio = ev.Level.ToPrio();
- switch (prio) {
- case ::NActors::NLog::EPrio::Alert:
+ switch (prio) {
+ case ::NActors::NLog::EPrio::Alert:
Metrics->IncAlertMsgs();
break;
- case ::NActors::NLog::EPrio::Emerg:
+ case ::NActors::NLog::EPrio::Emerg:
Metrics->IncEmergMsgs();
break;
default:
@@ -344,10 +344,10 @@ namespace NActors {
str << "<a href='logger?c=" << i << "'>" << name << "</a>";
}
TABLED() {
- str << PriorityToString(EPrio(componentSettings.Raw.X.Level));
+ str << PriorityToString(EPrio(componentSettings.Raw.X.Level));
}
TABLED() {
- str << PriorityToString(EPrio(componentSettings.Raw.X.SamplingLevel));
+ str << PriorityToString(EPrio(componentSettings.Raw.X.SamplingLevel));
}
TABLED() {
str << componentSettings.Raw.X.SamplingRate;
@@ -421,11 +421,11 @@ namespace NActors {
UL() {
LI() {
str << "Priority: "
- << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.Level));
+ << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.Level));
}
LI() {
str << "Sampling priority: "
- << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.SamplingLevel));
+ << NLog::PriorityToString(NLog::EPrio(componentSettings.Raw.X.SamplingLevel));
}
LI() {
str << "Sampling rate: "
@@ -444,7 +444,7 @@ namespace NActors {
for (int p = NLog::PRI_EMERG; p <= NLog::PRI_TRACE; ++p) {
LI() {
str << "<a href='logger?c=" << component << "&p=" << p << "'>"
- << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
+ << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
}
}
}
@@ -455,7 +455,7 @@ namespace NActors {
for (int p = NLog::PRI_EMERG; p <= NLog::PRI_TRACE; ++p) {
LI() {
str << "<a href='logger?c=" << component << "&sp=" << p << "'>"
- << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
+ << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
}
}
}
@@ -519,7 +519,7 @@ namespace NActors {
TABLER() {
TABLED() {
str << "<a href = 'logger?c=-1&p=" << p << "'>"
- << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
+ << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
}
}
}
@@ -541,7 +541,7 @@ namespace NActors {
TABLER() {
TABLED() {
str << "<a href = 'logger?c=-1&sp=" << p << "'>"
- << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
+ << NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
}
}
}
@@ -576,8 +576,8 @@ namespace NActors {
bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component,
const TString& formatted) noexcept try {
- const auto logPrio = ::ELogPriority(ui16(priority));
-
+ const auto logPrio = ::ELogPriority(ui16(priority));
+
char buf[TimeBufSize];
switch (Settings->Format) {
case NActors::NLog::TSettings::PLAIN_FULL_FORMAT: {
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 65e75b9909..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -2,7 +2,7 @@
#include "defs.h"
-#include "log_iface.h"
+#include "log_iface.h"
#include "log_settings.h"
#include "actorsystem.h"
#include "events.h"
@@ -122,7 +122,7 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////
// SET LOG LEVEL FOR A COMPONENT
////////////////////////////////////////////////////////////////////////////////
- class TLogComponentLevelRequest: public TEventLocal<TLogComponentLevelRequest, int(NLog::EEv::LevelReq)> {
+ class TLogComponentLevelRequest: public TEventLocal<TLogComponentLevelRequest, int(NLog::EEv::LevelReq)> {
public:
// set given priority for the component
TLogComponentLevelRequest(NLog::EPriority priority, NLog::EComponent component)
@@ -145,7 +145,7 @@ namespace NActors {
friend class TLoggerActor;
};
- class TLogComponentLevelResponse: public TEventLocal<TLogComponentLevelResponse, int(NLog::EEv::LevelResp)> {
+ class TLogComponentLevelResponse: public TEventLocal<TLogComponentLevelResponse, int(NLog::EEv::LevelResp)> {
public:
TLogComponentLevelResponse(int code, const TString& explanation)
: Code(code)
@@ -166,7 +166,7 @@ namespace NActors {
TString Explanation;
};
- class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> {
+ class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> {
public:
TLogIgnored() {
}
@@ -213,7 +213,7 @@ namespace NActors {
void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
HFunc(TLogIgnored, HandleIgnoredEvent);
- HFunc(NLog::TEvLog, HandleLogEvent);
+ HFunc(NLog::TEvLog, HandleLogEvent);
HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest);
HFunc(NMon::TEvHttpInfo, HandleMonInfo);
}
@@ -246,7 +246,7 @@ namespace NActors {
void BecomeDefunct();
void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx);
void HandleIgnoredEventDrop();
- void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx);
+ void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx);
void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev);
void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx);
void HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx);
diff --git a/library/cpp/actors/core/log_iface.h b/library/cpp/actors/core/log_iface.h
index 0ca9c14b91..b331db9ca8 100644
--- a/library/cpp/actors/core/log_iface.h
+++ b/library/cpp/actors/core/log_iface.h
@@ -1,12 +1,12 @@
-#pragma once
-
-#include "events.h"
-#include "event_local.h"
-
-namespace NActors {
+#pragma once
+
+#include "events.h"
+#include "event_local.h"
+
+namespace NActors {
namespace NLog {
using EComponent = int;
-
+
enum EPriority : ui16 { // migrate it to EPrio whenever possible
PRI_EMERG,
PRI_ALERT,
@@ -18,7 +18,7 @@ namespace NActors {
PRI_DEBUG,
PRI_TRACE
};
-
+
enum class EPrio : ui16 {
Emerg = 0,
Alert = 1,
@@ -30,39 +30,39 @@ namespace NActors {
Debug = 7,
Trace = 8,
};
-
+
struct TLevel {
TLevel(ui32 raw)
: Raw(raw)
{
}
-
+
TLevel(EPrio prio)
: Raw((ui16(prio) + 1) << 8)
{
}
-
+
EPrio ToPrio() const noexcept {
const auto major = Raw >> 8;
-
+
return major > 0 ? EPrio(major - 1) : EPrio::Emerg;
}
-
+
bool IsUrgentAbortion() const noexcept {
return (Raw >> 8) == 0;
}
-
+
/* Generalized monotonic level value composed with major and minor
- levels. Minor is used for verbosity within major, basic EPrio
- mapped to (EPrio + 1, 0) and Major = 0 is reserved as special
- space with meaning like EPrio::Emerg but with extened actions.
- Thus logger should map Major = 0 to EPrio::Emerg if it have no
- idea how to handle special emergency actions.
- */
-
+ levels. Minor is used for verbosity within major, basic EPrio
+ mapped to (EPrio + 1, 0) and Major = 0 is reserved as special
+ space with meaning like EPrio::Emerg but with extened actions.
+ Thus logger should map Major = 0 to EPrio::Emerg if it have no
+ idea how to handle special emergency actions.
+ */
+
ui32 Raw = 0; // ((ui16(EPrio) + 1) << 8) | ui8(minor)
};
-
+
enum class EEv {
Log = EventSpaceBegin(TEvents::ES_LOGGER),
LevelReq,
@@ -70,9 +70,9 @@ namespace NActors {
Ignored,
End
};
-
+
static_assert(int(EEv::End) < EventSpaceEnd(TEvents::ES_LOGGER), "");
-
+
class TEvLog: public TEventLocal<TEvLog, int(EEv::Log)> {
public:
TEvLog(TInstant stamp, TLevel level, EComponent comp, const TString &line)
@@ -90,7 +90,7 @@ namespace NActors {
, Line(std::move(line))
{
}
-
+
TEvLog(EPriority prio, EComponent comp, TString line, TInstant time = TInstant::Now())
: Stamp(time)
, Level(EPrio(prio))
@@ -98,12 +98,12 @@ namespace NActors {
, Line(std::move(line))
{
}
-
+
const TInstant Stamp = TInstant::Max();
const TLevel Level;
const EComponent Component = 0;
TString Line;
};
-
+
}
-}
+}
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index 5a5b66b4ac..f52f2fc5d2 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -116,7 +116,7 @@ namespace NActors {
str << titleName
<< " for all components has been changed to "
- << PriorityToString(EPrio(priority));
+ << PriorityToString(EPrio(priority));
explanation = str.Str();
return 0;
} else {
@@ -136,8 +136,8 @@ namespace NActors {
AtomicSet(ComponentInfo[component], settings.Raw.Data);
TStringStream str;
str << titleName << " for the component " << ComponentNames[component]
- << " has been changed from " << PriorityToString(EPrio(oldPriority))
- << " to " << PriorityToString(EPrio(priority));
+ << " has been changed from " << PriorityToString(EPrio(oldPriority))
+ << " to " << PriorityToString(EPrio(priority));
explanation = str.Str();
return 0;
}
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 7f9ea16553..7fe4504edd 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -1,32 +1,32 @@
#pragma once
#include "actor.h"
-#include "log_iface.h"
+#include "log_iface.h"
#include <util/generic/vector.h>
#include <util/digest/murmur.h>
#include <util/random/easy.h>
namespace NActors {
namespace NLog {
- inline const char* PriorityToString(EPrio priority) {
+ inline const char* PriorityToString(EPrio priority) {
switch (priority) {
- case EPrio::Emerg:
+ case EPrio::Emerg:
return "EMERG";
- case EPrio::Alert:
+ case EPrio::Alert:
return "ALERT";
- case EPrio::Crit:
+ case EPrio::Crit:
return "CRIT";
- case EPrio::Error:
+ case EPrio::Error:
return "ERROR";
- case EPrio::Warn:
+ case EPrio::Warn:
return "WARN";
- case EPrio::Notice:
+ case EPrio::Notice:
return "NOTICE";
- case EPrio::Info:
+ case EPrio::Info:
return "INFO";
- case EPrio::Debug:
+ case EPrio::Debug:
return "DEBUG";
- case EPrio::Trace:
+ case EPrio::Trace:
return "TRACE";
default:
return "UNKNOWN";
diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp
index c966fd7b13..649339ddb2 100644
--- a/library/cpp/actors/dnscachelib/dnscache.cpp
+++ b/library/cpp/actors/dnscachelib/dnscache.cpp
@@ -7,7 +7,7 @@
#include <util/datetime/systime.h>
const TDnsCache::THost TDnsCache::NullHost;
-
+
LWTRACE_USING(DNSCACHELIB_PROVIDER);
static_assert(sizeof(ares_channel) == sizeof(void*), "expect sizeof(ares_channel) == sizeof(void *)");
@@ -22,7 +22,7 @@ TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg
, ACacheMisses(0)
, PtrCacheHits(0)
, PtrCacheMisses(0)
-{
+{
#ifdef _win_
if (ares_library_init(ARES_LIB_INIT_WIN32) != ARES_SUCCESS) {
LWPROBE(AresInitFailed);
@@ -53,97 +53,97 @@ TDnsCache::~TDnsCache(void) {
}
TString TDnsCache::GetHostByAddr(const NAddr::IRemoteAddr& addr) {
- in6_addr key;
-
- if (addr.Addr()->sa_family == AF_INET6) {
+ in6_addr key;
+
+ if (addr.Addr()->sa_family == AF_INET6) {
const struct sockaddr_in6* s6 = (const struct sockaddr_in6*)(addr.Addr());
- memcpy(&key, &s6->sin6_addr, sizeof(s6->sin6_addr));
- } else if (addr.Addr()->sa_family == AF_INET) {
+ memcpy(&key, &s6->sin6_addr, sizeof(s6->sin6_addr));
+ } else if (addr.Addr()->sa_family == AF_INET) {
const struct sockaddr_in* s4 = (const struct sockaddr_in*)(addr.Addr());
- memset(&key, 0, sizeof(key));
- memcpy(&key, &s4->sin_addr, sizeof(s4->sin_addr));
- } else {
- return "";
- }
+ memset(&key, 0, sizeof(key));
+ memcpy(&key, &s4->sin_addr, sizeof(s4->sin_addr));
+ } else {
+ return "";
+ }
const TAddr& host = ResolveAddr(key, addr.Addr()->sa_family);
-
- return host.Hostname;
-}
-
+
+ return host.Hostname;
+}
+
TIpHost TDnsCache::Get(const TString& hostname) {
- if (!AllowIpV4)
- return TIpHost(-1);
-
+ if (!AllowIpV4)
+ return TIpHost(-1);
+
const THost& addr = Resolve(hostname, AF_INET);
-
+
TGuard<TMutex> lock(CacheMtx);
- if (addr.AddrsV4.empty()) {
- return TIpHost(-1);
- }
- return addr.AddrsV4.front();
-}
-
-NAddr::IRemoteAddrPtr TDnsCache::GetAddr(
+ if (addr.AddrsV4.empty()) {
+ return TIpHost(-1);
+ }
+ return addr.AddrsV4.front();
+}
+
+NAddr::IRemoteAddrPtr TDnsCache::GetAddr(
const TString& hostname,
int family,
TIpPort port,
bool cacheOnly) {
- if (family != AF_INET && AllowIpV6) {
+ if (family != AF_INET && AllowIpV6) {
const THost& addr = Resolve(hostname, AF_INET6, cacheOnly);
-
+
TGuard<TMutex> lock(CacheMtx);
- if (!addr.AddrsV6.empty()) {
- struct sockaddr_in6 sin6;
- Zero(sin6);
- sin6.sin6_family = AF_INET6;
- sin6.sin6_addr = addr.AddrsV6.front();
- sin6.sin6_port = HostToInet(port);
-
+ if (!addr.AddrsV6.empty()) {
+ struct sockaddr_in6 sin6;
+ Zero(sin6);
+ sin6.sin6_family = AF_INET6;
+ sin6.sin6_addr = addr.AddrsV6.front();
+ sin6.sin6_port = HostToInet(port);
+
return MakeHolder<NAddr::TIPv6Addr>(sin6);
- }
- }
-
- if (family != AF_INET6 && AllowIpV4) {
+ }
+ }
+
+ if (family != AF_INET6 && AllowIpV4) {
const THost& addr = Resolve(hostname, AF_INET, cacheOnly);
-
+
TGuard<TMutex> lock(CacheMtx);
- if (!addr.AddrsV4.empty()) {
+ if (!addr.AddrsV4.empty()) {
return MakeHolder<NAddr::TIPv4Addr>(TIpAddress(addr.AddrsV4.front(), port));
- }
- }
-
+ }
+ }
+
LWPROBE(FamilyMismatch, family, AllowIpV4, AllowIpV6);
return nullptr;
-}
-
-void TDnsCache::GetAllAddresses(
+}
+
+void TDnsCache::GetAllAddresses(
const TString& hostname,
TVector<NAddr::IRemoteAddrPtr>& addrs) {
- if (AllowIpV4) {
+ if (AllowIpV4) {
const THost& addr4 = Resolve(hostname, AF_INET);
TGuard<TMutex> lock(CacheMtx);
- for (size_t i = 0; i < addr4.AddrsV4.size(); i++) {
+ for (size_t i = 0; i < addr4.AddrsV4.size(); i++) {
addrs.push_back(MakeHolder<NAddr::TIPv4Addr>(TIpAddress(addr4.AddrsV4[i], 0)));
- }
- }
-
- if (AllowIpV6) {
+ }
+ }
+
+ if (AllowIpV6) {
const THost& addr6 = Resolve(hostname, AF_INET6);
- struct sockaddr_in6 sin6;
- Zero(sin6);
- sin6.sin6_family = AF_INET6;
+ struct sockaddr_in6 sin6;
+ Zero(sin6);
+ sin6.sin6_family = AF_INET6;
TGuard<TMutex> lock(CacheMtx);
- for (size_t i = 0; i < addr6.AddrsV6.size(); i++) {
- sin6.sin6_addr = addr6.AddrsV6[i];
-
+ for (size_t i = 0; i < addr6.AddrsV6.size(); i++) {
+ sin6.sin6_addr = addr6.AddrsV6[i];
+
addrs.push_back(MakeHolder<NAddr::TIPv6Addr>(sin6));
- }
- }
-}
-
+ }
+ }
+}
+
void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
ui64& ptr_cache_hits, ui64& ptr_cache_misses) {
TGuard<TMutex> lock(CacheMtx);
@@ -169,11 +169,11 @@ bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept
const TDnsCache::THost&
TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
- if (!ValidateHName(hostname)) {
+ if (!ValidateHName(hostname)) {
LWPROBE(ResolveNullHost, hostname, family);
- return NullHost;
- }
-
+ return NullHost;
+ }
+
THostCache::iterator p;
Y_ASSERT(family == AF_INET || family == AF_INET6);
@@ -232,9 +232,9 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
}
bool TDnsCache::ValidateHName(const TString& name) const noexcept {
- return name.size() > 0;
-}
-
+ return name.size() > 0;
+}
+
const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) {
TAddrCache::iterator p;
@@ -282,7 +282,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family)
}
void TDnsCache::WaitTask(TAtomic& flag) {
- const TInstant start = TInstant(TTimeKeeper::GetTimeval());
+ const TInstant start = TInstant(TTimeKeeper::GetTimeval());
while (AtomicGet(flag)) {
ares_channel chan = static_cast<ares_channel>(Channel);
@@ -319,11 +319,11 @@ void TDnsCache::WaitTask(TAtomic& flag) {
Y_ASSERT(nfds != 0);
- const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start;
- const TDuration wait = Max(Timeout - left, TDuration::Zero());
-
- int rv = poll(pfd, nfds, wait.MilliSeconds());
-
+ const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start;
+ const TDuration wait = Max(Timeout - left, TDuration::Zero());
+
+ int rv = poll(pfd, nfds, wait.MilliSeconds());
+
if (rv == -1) {
if (errno == EINTR) {
continue;
@@ -351,10 +351,10 @@ void TDnsCache::WaitTask(TAtomic& flag) {
: ARES_SOCKET_BAD);
}
}
-
- if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) {
- break;
- }
+
+ if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) {
+ break;
+ }
}
}
diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h
index 2d6adb3a36..3313a251a1 100644
--- a/library/cpp/actors/dnscachelib/dnscache.h
+++ b/library/cpp/actors/dnscachelib/dnscache.h
@@ -5,7 +5,7 @@
#include <util/generic/vector.h>
#include <util/network/address.h>
#include <util/system/mutex.h>
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
/** Asynchronous DNS resolver.
*
@@ -19,7 +19,7 @@
class TDnsCache {
public:
TDnsCache(bool allowIpv4 = true, bool allowIpv6 = true, time_t entry_lifetime = 1800, time_t neg_lifetime = 1, ui32 request_timeout = 500000);
- ~TDnsCache();
+ ~TDnsCache();
TString GetHostByAddr(const NAddr::IRemoteAddr&);
@@ -37,9 +37,9 @@ public:
void GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
ui64& ptr_cache_hits, ui64& ptr_cache_misses);
-protected:
+protected:
bool ValidateHName(const TString& host) const noexcept;
-
+
private:
struct TGHBNContext {
TDnsCache* Owner;
@@ -52,7 +52,7 @@ private:
in6_addr Addr;
};
- struct THost {
+ struct THost {
THost() noexcept {
}
@@ -73,8 +73,8 @@ private:
};
typedef TMap<TString, THost> THostCache;
-
- struct TAddr {
+
+ struct TAddr {
TString Hostname;
time_t Resolved = 0;
time_t NotFound = 0;
@@ -112,7 +112,7 @@ private:
const time_t EntryLifetime;
const time_t NegativeLifetime;
- const TDuration Timeout;
+ const TDuration Timeout;
const bool AllowIpV4;
const bool AllowIpV6;
@@ -124,8 +124,8 @@ private:
ui64 PtrCacheHits;
ui64 PtrCacheMisses;
- const static THost NullHost;
-
+ const static THost NullHost;
+
TMutex AresMtx;
void* Channel;
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 7c4871399d..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -1,4 +1,4 @@
-#include "test_runtime.h"
+#include "test_runtime.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/callstack.h>
@@ -22,16 +22,16 @@
bool VERBOSE = false;
const bool PRINT_EVENT_BODY = false;
-namespace {
+namespace {
+
+ TString MakeClusterId() {
+ pid_t pid = getpid();
+ TStringBuilder uuid;
+ uuid << "Cluster for process with id: " << pid;
+ return uuid;
+ }
+}
- TString MakeClusterId() {
- pid_t pid = getpid();
- TStringBuilder uuid;
- uuid << "Cluster for process with id: " << pid;
- return uuid;
- }
-}
-
namespace NActors {
ui64 TScheduledEventQueueItem::NextUniqueId = 0;
@@ -80,7 +80,7 @@ namespace NActors {
TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
Stop();
}
-
+
class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
public:
@@ -326,7 +326,7 @@ namespace NActors {
}
TDuration delay = (deadline - now);
- if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
+ if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
Runtime->MailboxesHasEvents.Signal();
@@ -365,7 +365,7 @@ namespace NActors {
}
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
+ if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
if (ev->GetRecipientRewrite() == logger) {
@@ -451,15 +451,15 @@ namespace NActors {
TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
: TTestActorRuntimeBase(1, 1, false)
- {
- SingleSysEnv = true;
- }
-
+ {
+ SingleSysEnv = true;
+ }
+
TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
: ScheduledCount(0)
, ScheduledLimit(100000)
, MainThreadId(TThread::CurrentThreadId())
- , ClusterUUID(MakeClusterId())
+ , ClusterUUID(MakeClusterId())
, FirstNodeId(NextNodeId)
, NodeCount(nodeCount)
, DataCenterCount(dataCenterCount)
@@ -741,7 +741,7 @@ namespace NActors {
void TTestActorRuntimeBase::InitNodes() {
NextNodeId += NodeCount;
Y_VERIFY(NodeCount > 0);
-
+
for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
TNodeDataBase* node = nodeIt->second.Get();
@@ -900,7 +900,7 @@ namespace NActors {
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
- DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
+ DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
switch (mailboxType) {
case TMailboxType::Simple:
@@ -944,7 +944,7 @@ namespace NActors {
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
- DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
+ DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
return actorId;
}
@@ -1583,7 +1583,7 @@ namespace NActors {
TCallstack::GetTlsCallstack().SetLinesToSkip();
#endif
recipientActor->Receive(evHolder, ctx);
- node->ExecutorThread->DropUnregistered();
+ node->ExecutorThread->DropUnregistered();
}
CurrentRecipient = TActorId();
TlsActivationContext = prevTlsActivationContext;
@@ -1683,24 +1683,24 @@ namespace NActors {
NActors::TMailboxType::Simple, InterconnectPoolId()));
}
- if (!SingleSysEnv) { // Single system env should do this self
- TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
- NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
+ if (!SingleSysEnv) { // Single system env should do this self
+ TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
+ NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
- setup->LocalServices.push_back(loggerActorPair);
- }
+ setup->LocalServices.push_back(loggerActorPair);
+ }
return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
}
TActorSystem* TTestActorRuntimeBase::SingleSys() const {
- Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
-
- return Nodes.begin()->second->ActorSystem.Get();
- }
-
+ Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
+
+ return Nodes.begin()->second->ActorSystem.Get();
+ }
+
TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
for (auto& x : Nodes) {
return x.second->ActorSystem.Get();
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 38e964835c..26e3b45c98 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -39,8 +39,8 @@ const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer(
namespace NActors {
- struct THeSingleSystemEnv { };
-
+ struct THeSingleSystemEnv { };
+
struct TEventMailboxId {
TEventMailboxId()
: NodeId(0)
@@ -200,7 +200,7 @@ namespace NActors {
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
-
+
TTestActorRuntimeBase(THeSingleSystemEnv);
TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
@@ -284,7 +284,7 @@ namespace NActors {
return Nodes[FirstNodeId + nodeIdx]->LogSettings;
}
- TActorSystem* SingleSys() const;
+ TActorSystem* SingleSys() const;
TActorSystem* GetAnyNodeActorSystem();
TActorSystem* GetActorSystem(ui32 nodeId);
template <typename TEvent>
@@ -471,12 +471,12 @@ namespace NActors {
UseRealInterconnect = true;
}
- protected:
+ protected:
struct TNodeDataBase;
TNodeDataBase* GetRawNode(ui32 node) const {
- return Nodes.at(FirstNodeId + node).Get();
- }
-
+ return Nodes.at(FirstNodeId + node).Get();
+ }
+
static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId);
virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) {
Y_UNUSED(counters);
@@ -492,7 +492,7 @@ namespace NActors {
Y_UNUSED(setup);
}
- private:
+ private:
IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
@@ -507,12 +507,12 @@ namespace NActors {
THolder<TTempDir> TmpDir;
const TThread::TId MainThreadId;
- protected:
+ protected:
bool UseRealInterconnect = false;
TInterconnectMock InterconnectMock;
bool IsInitialized = false;
bool SingleSysEnv = false;
- const TString ClusterUUID;
+ const TString ClusterUUID;
const ui32 FirstNodeId;
const ui32 NodeCount;
const ui32 DataCenterCount;
@@ -535,7 +535,7 @@ namespace NActors {
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
- protected:
+ protected:
struct TNodeDataBase: public TThrRefBase {
TNodeDataBase();
void Stop();
@@ -598,7 +598,7 @@ namespace NActors {
protected:
THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
- private:
+ private:
void InitNode(TNodeDataBase* node, size_t idx);
struct TDispatchContext {
diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make
index 87dc50175e..1afb3f6059 100644
--- a/library/cpp/actors/testlib/ya.make
+++ b/library/cpp/actors/testlib/ya.make
@@ -1,26 +1,26 @@
-LIBRARY()
-
-OWNER(
- g:kikimr
-)
-
-SRCS(
- test_runtime.cpp
-)
-
-PEERDIR(
+LIBRARY()
+
+OWNER(
+ g:kikimr
+)
+
+SRCS(
+ test_runtime.cpp
+)
+
+PEERDIR(
library/cpp/actors/core
library/cpp/actors/interconnect/mock
library/cpp/actors/protos
library/cpp/random_provider
library/cpp/time_provider
-)
-
-IF (GCC)
- CFLAGS(-fno-devirtualize-speculatively)
-ENDIF()
-
-END()
+)
+
+IF (GCC)
+ CFLAGS(-fno-devirtualize-speculatively)
+ENDIF()
+
+END()
RECURSE_FOR_TESTS(
ut
diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h
index 588b051815..fd2a906d6f 100644
--- a/library/cpp/lfalloc/lf_allocX64.h
+++ b/library/cpp/lfalloc/lf_allocX64.h
@@ -390,53 +390,53 @@ static char* AllocWithMMap(uintptr_t sz, EMMapMode mode) {
return largeBlock;
}
-enum class ELarge : ui8 {
- Free = 0, // block in free cache
- Alloc = 1, // block is allocated
- Gone = 2, // block was unmapped
-};
-
-struct TLargeBlk {
-
- static TLargeBlk* As(void *raw) {
- return reinterpret_cast<TLargeBlk*>((char*)raw - 4096ll);
- }
-
- static const TLargeBlk* As(const void *raw) {
- return reinterpret_cast<const TLargeBlk*>((const char*)raw - 4096ll);
- }
-
- void SetSize(size_t bytes, size_t pages) {
- Pages = pages;
- Bytes = bytes;
- }
-
- void Mark(ELarge state) {
- const ui64 marks[] = {
- 0x8b38aa5ca4953c98, // ELarge::Free
- 0xf916d33584eb5087, // ELarge::Alloc
- 0xd33b0eca7651bc3f // ELarge::Gone
- };
-
- Token = size_t(marks[ui8(state)]);
- }
-
- size_t Pages; // Total pages allocated with mmap like call
- size_t Bytes; // Actually requested bytes by user
- size_t Token; // Block state token, see ELarge enum.
-};
-
-
-static void LargeBlockUnmap(void* p, size_t pages) {
- const auto bytes = (pages + 1) * uintptr_t(4096);
-
- IncrementCounter(CT_MUNMAP, bytes);
+enum class ELarge : ui8 {
+ Free = 0, // block in free cache
+ Alloc = 1, // block is allocated
+ Gone = 2, // block was unmapped
+};
+
+struct TLargeBlk {
+
+ static TLargeBlk* As(void *raw) {
+ return reinterpret_cast<TLargeBlk*>((char*)raw - 4096ll);
+ }
+
+ static const TLargeBlk* As(const void *raw) {
+ return reinterpret_cast<const TLargeBlk*>((const char*)raw - 4096ll);
+ }
+
+ void SetSize(size_t bytes, size_t pages) {
+ Pages = pages;
+ Bytes = bytes;
+ }
+
+ void Mark(ELarge state) {
+ const ui64 marks[] = {
+ 0x8b38aa5ca4953c98, // ELarge::Free
+ 0xf916d33584eb5087, // ELarge::Alloc
+ 0xd33b0eca7651bc3f // ELarge::Gone
+ };
+
+ Token = size_t(marks[ui8(state)]);
+ }
+
+ size_t Pages; // Total pages allocated with mmap like call
+ size_t Bytes; // Actually requested bytes by user
+ size_t Token; // Block state token, see ELarge enum.
+};
+
+
+static void LargeBlockUnmap(void* p, size_t pages) {
+ const auto bytes = (pages + 1) * uintptr_t(4096);
+
+ IncrementCounter(CT_MUNMAP, bytes);
IncrementCounter(CT_MUNMAP_CNT, 1);
#ifdef _MSC_VER
Y_ASSERT_NOBT(0);
#else
- TLargeBlk::As(p)->Mark(ELarge::Gone);
- munmap((char*)p - 4096ll, bytes);
+ TLargeBlk::As(p)->Mark(ELarge::Gone);
+ munmap((char*)p - 4096ll, bytes);
#endif
}
@@ -447,7 +447,7 @@ static int LB_LIMIT_TOTAL_SIZE = 500 * 1024 * 1024 / 4096; // do not keep more t
static void* volatile lbFreePtrs[LB_BUF_HASH][LB_BUF_SIZE];
static TAtomic lbFreePageCount;
-
+
static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) {
size_t pgCount = (_nSize + 4095) / 4096;
#ifdef _MSC_VER
@@ -466,16 +466,16 @@ static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) {
if (p == nullptr)
continue;
if (DoCas(&lbFreePtrs[lbHash][i], (void*)nullptr, p) == p) {
- size_t realPageCount = TLargeBlk::As(p)->Pages;
+ size_t realPageCount = TLargeBlk::As(p)->Pages;
if (realPageCount == pgCount) {
AtomicAdd(lbFreePageCount, -pgCount);
- TLargeBlk::As(p)->Mark(ELarge::Alloc);
+ TLargeBlk::As(p)->Mark(ELarge::Alloc);
return p;
} else {
if (DoCas(&lbFreePtrs[lbHash][i], p, (void*)nullptr) != (void*)nullptr) {
// block was freed while we were busy
AtomicAdd(lbFreePageCount, -realPageCount);
- LargeBlockUnmap(p, realPageCount);
+ LargeBlockUnmap(p, realPageCount);
--i;
}
}
@@ -484,8 +484,8 @@ static void* LargeBlockAlloc(size_t _nSize, ELFAllocCounter counter) {
char* pRes = AllocWithMMap((pgCount + 1) * 4096ll, MM_HUGE);
#endif
pRes += 4096ll;
- TLargeBlk::As(pRes)->SetSize(_nSize, pgCount);
- TLargeBlk::As(pRes)->Mark(ELarge::Alloc);
+ TLargeBlk::As(pRes)->SetSize(_nSize, pgCount);
+ TLargeBlk::As(pRes)->Mark(ELarge::Alloc);
return pRes;
}
@@ -498,9 +498,9 @@ static void FreeAllLargeBlockMem() {
if (p == nullptr)
continue;
if (DoCas(&lbFreePtr[i], (void*)nullptr, p) == p) {
- int pgCount = TLargeBlk::As(p)->Pages;
+ int pgCount = TLargeBlk::As(p)->Pages;
AtomicAdd(lbFreePageCount, -pgCount);
- LargeBlockUnmap(p, pgCount);
+ LargeBlockUnmap(p, pgCount);
}
}
}
@@ -513,9 +513,9 @@ static void LargeBlockFree(void* p, ELFAllocCounter counter) {
#ifdef _MSC_VER
VirtualFree((char*)p - 4096ll, 0, MEM_RELEASE);
#else
- size_t pgCount = TLargeBlk::As(p)->Pages;
+ size_t pgCount = TLargeBlk::As(p)->Pages;
- TLargeBlk::As(p)->Mark(ELarge::Free);
+ TLargeBlk::As(p)->Mark(ELarge::Free);
IncrementCounter(counter, pgCount * 4096ll);
IncrementCounter(CT_SYSTEM_FREE, 4096ll);
@@ -531,7 +531,7 @@ static void LargeBlockFree(void* p, ELFAllocCounter counter) {
}
}
- LargeBlockUnmap(p, pgCount);
+ LargeBlockUnmap(p, pgCount);
#endif
}
@@ -1644,23 +1644,23 @@ static Y_FORCE_INLINE void LFFree(void* p) {
}
}
-static size_t LFGetSize(const void* p) {
+static size_t LFGetSize(const void* p) {
#if defined(LFALLOC_DBG)
if (p == nullptr)
return 0;
- return GetAllocHeader(const_cast<void*>(p))->Size;
+ return GetAllocHeader(const_cast<void*>(p))->Size;
#endif
- uintptr_t chkOffset = ((const char*)p - ALLOC_START);
+ uintptr_t chkOffset = ((const char*)p - ALLOC_START);
if (chkOffset >= N_MAX_WORKSET_SIZE) {
if (p == nullptr)
return 0;
- return TLargeBlk::As(p)->Pages * 4096ll;
+ return TLargeBlk::As(p)->Pages * 4096ll;
}
- uintptr_t chunk = ((const char*)p - ALLOC_START) / N_CHUNK_SIZE;
+ uintptr_t chunk = ((const char*)p - ALLOC_START) / N_CHUNK_SIZE;
ptrdiff_t nSizeIdx = chunkSizeIdx[chunk];
if (nSizeIdx <= 0)
- return TLargeBlk::As(p)->Pages * 4096ll;
+ return TLargeBlk::As(p)->Pages * 4096ll;
return nSizeIdxToSize[nSizeIdx];
}
diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp
index de8810d02e..64a38619c2 100644
--- a/library/cpp/messagebus/acceptor.cpp
+++ b/library/cpp/messagebus/acceptor.cpp
@@ -19,7 +19,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c
: TActor<TAcceptor>(session->Queue->WorkQueue.Get())
, AcceptorId(acceptorId)
, Session(session)
- , GranStatus(session->Config.Secret.StatusFlushPeriod)
+ , GranStatus(session->Config.Secret.StatusFlushPeriod)
{
SetNonBlock(socket, true);
@@ -30,7 +30,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c
Stats.Fd = socket;
Stats.ListenAddr = addr;
- SendStatus(TInstant::Now());
+ SendStatus(TInstant::Now());
}
void TAcceptor::Act(TDefaultTag) {
@@ -40,8 +40,8 @@ void TAcceptor::Act(TDefaultTag) {
return;
}
- TInstant now = TInstant::Now();
-
+ TInstant now = TInstant::Now();
+
if (state == SS_SHUTDOWN_COMMAND) {
if (!!Channel) {
Channel->Unregister();
@@ -49,7 +49,7 @@ void TAcceptor::Act(TDefaultTag) {
Stats.Fd = INVALID_SOCKET;
}
- SendStatus(now);
+ SendStatus(now);
Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
Stats.ResetIncremental();
@@ -96,7 +96,7 @@ void TAcceptor::Act(TDefaultTag) {
Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
- Stats.LastAcceptSuccessInstant = now;
+ Stats.LastAcceptSuccessInstant = now;
++Stats.AcceptSuccessCount;
}
@@ -105,11 +105,11 @@ void TAcceptor::Act(TDefaultTag) {
Channel->EnableRead();
- SendStatus(now);
+ SendStatus(now);
}
-void TAcceptor::SendStatus(TInstant now) {
- GranStatus.Listen.Update(Stats, now);
+void TAcceptor::SendStatus(TInstant now) {
+ GranStatus.Listen.Update(Stats, now);
}
void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
diff --git a/library/cpp/messagebus/acceptor.h b/library/cpp/messagebus/acceptor.h
index 8ec2229d63..57cb010bf2 100644
--- a/library/cpp/messagebus/acceptor.h
+++ b/library/cpp/messagebus/acceptor.h
@@ -55,6 +55,6 @@ namespace NBus {
TGranStatus GranStatus;
};
-
+
}
}
diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h
index d26a546296..40fa536b82 100644
--- a/library/cpp/messagebus/actor/queue_for_actor.h
+++ b/library/cpp/messagebus/actor/queue_for_actor.h
@@ -60,15 +60,15 @@ namespace NActor {
temp.Shrink();
}
}
-
+
template <typename TFunc>
void DequeueAllLikelyEmpty(const TFunc& func) {
if (Y_LIKELY(IsEmpty())) {
return;
}
-
+
DequeueAll(func);
- }
+ }
};
}
diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h
index 407703d702..675d92f5b0 100644
--- a/library/cpp/messagebus/actor/temp_tls_vector.h
+++ b/library/cpp/messagebus/actor/temp_tls_vector.h
@@ -23,18 +23,18 @@ public:
}
~TTempTlsVector() {
- Clear();
- }
-
- void Clear() {
+ Clear();
+ }
+
+ void Clear() {
Vector->clear();
}
-
+
size_t Capacity() const noexcept {
- return Vector->capacity();
- }
-
- void Shrink() {
- Vector->shrink_to_fit();
- }
+ return Vector->capacity();
+ }
+
+ void Shrink() {
+ Vector->shrink_to_fit();
+ }
};
diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp
index c1cb356840..962ac538e2 100644
--- a/library/cpp/messagebus/config/netaddr.cpp
+++ b/library/cpp/messagebus/config/netaddr.cpp
@@ -129,7 +129,7 @@ namespace NBus {
ythrow TNetAddr::TError() << "cannot resolve " << host << ":" << port << " into " << Describe(requireVersion);
}
}
-
+
TNetAddr::TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_ANY*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/)
: Ptr(MakeAddress(na, requireVersion, preferVersion))
{
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h
index ccb4b42810..b79c0cc355 100644
--- a/library/cpp/messagebus/config/netaddr.h
+++ b/library/cpp/messagebus/config/netaddr.h
@@ -36,14 +36,14 @@ namespace NBus {
public:
class TError: public yexception {
};
-
+
TNetAddr();
TNetAddr(TAutoPtr<IRemoteAddr> addr);
TNetAddr(const char* hostPort, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(TStringBuf host, int port, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(const TNetworkAddress& na, const TAddrInfo& ai);
-
+
bool operator==(const TNetAddr&) const;
bool operator!=(const TNetAddr& other) const {
return !(*this == other);
diff --git a/library/cpp/messagebus/config/session_config.cpp b/library/cpp/messagebus/config/session_config.cpp
index 17157b3dfa..fbbbb106c9 100644
--- a/library/cpp/messagebus/config/session_config.cpp
+++ b/library/cpp/messagebus/config/session_config.cpp
@@ -120,7 +120,7 @@ void TBusSessionConfig::ConfigureLastGetopt(NLastGetopt::TOpts& opts,
opts.AddLongOption(prefix + "max-message-size")
.RequiredArgument("BYTES")
.DefaultValue(ToString(MaxMessageSize))
- .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix);
+ .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix);
opts.AddLongOption(prefix + "socket-recv-buffer-size")
.RequiredArgument("BYTES")
.DefaultValue(ToString(SocketRecvBufferSize))
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index fd2e726d0b..f685135bed 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -78,7 +78,7 @@ public:
const char* Name;
- TAtomic RunningState;
+ TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
TData Data;
@@ -143,7 +143,7 @@ void TEventLoop::Stop() {
}
bool TEventLoop::IsRunning() {
- return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
+ return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
}
TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
@@ -277,7 +277,7 @@ TEventLoop::TImpl::TImpl(const char* name)
}
void TEventLoop::TImpl::Run() {
- bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
+ bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
if (!!Name) {
@@ -320,21 +320,21 @@ void TEventLoop::TImpl::Run() {
Data.clear();
}
- res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
+ res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
Y_VERIFY(res);
-
+
StoppedEvent.Signal();
}
void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
- if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
- Wakeup();
+ if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
+ Wakeup();
- StoppedEvent.WaitI();
- }
+ StoppedEvent.WaitI();
+ }
}
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h
index 8b04aca597..36ecfebc93 100644
--- a/library/cpp/messagebus/misc/granup.h
+++ b/library/cpp/messagebus/misc/granup.h
@@ -1,50 +1,50 @@
-#pragma once
-
+#pragma once
+
#include <util/datetime/base.h>
#include <util/system/guard.h>
-#include <util/system/mutex.h>
-#include <util/system/spinlock.h>
-
-namespace NBus {
- template <typename TItem, typename TLocker = TSpinLock>
- class TGranUp {
- public:
- TGranUp(TDuration gran)
- : Gran(gran)
+#include <util/system/mutex.h>
+#include <util/system/spinlock.h>
+
+namespace NBus {
+ template <typename TItem, typename TLocker = TSpinLock>
+ class TGranUp {
+ public:
+ TGranUp(TDuration gran)
+ : Gran(gran)
, Next(TInstant::MicroSeconds(0))
{
}
-
+
template <typename TFunctor>
void Update(TFunctor functor, TInstant now, bool force = false) {
if (force || now > Next)
- Set(functor(), now);
- }
-
+ Set(functor(), now);
+ }
+
void Update(const TItem& item, TInstant now, bool force = false) {
if (force || now > Next)
- Set(item, now);
- }
-
+ Set(item, now);
+ }
+
TItem Get() const noexcept {
TGuard<TLocker> guard(Lock);
-
- return Item;
- }
-
- protected:
+
+ return Item;
+ }
+
+ protected:
void Set(const TItem& item, TInstant now) {
TGuard<TLocker> guard(Lock);
-
- Item = item;
-
- Next = now + Gran;
- }
-
- private:
- const TDuration Gran;
+
+ Item = item;
+
+ Next = now + Gran;
+ }
+
+ private:
+ const TDuration Gran;
TLocker Lock;
TItem Item;
TInstant Next;
- };
-}
+ };
+}
diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h
index 954cf0f0d7..190547fa54 100644
--- a/library/cpp/messagebus/misc/tokenquota.h
+++ b/library/cpp/messagebus/misc/tokenquota.h
@@ -1,83 +1,83 @@
-#pragma once
-
-#include <util/system/atomic.h>
-
-namespace NBus {
- /* Consumer and feeder quota model impl.
-
- Consumer thread only calls:
- Acquire(), fetches tokens for usage from bucket;
- Consume(), eats given amount of tokens, must not
- be greater than Value() items;
-
- Other threads (feeders) calls:
- Return(), put used tokens back to bucket;
- */
-
- class TTokenQuota {
- public:
- TTokenQuota(bool enabled, size_t tokens, size_t wake)
- : Enabled(tokens > 0 ? enabled : false)
- , Acquired(0)
- , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
- , Tokens_(tokens)
+#pragma once
+
+#include <util/system/atomic.h>
+
+namespace NBus {
+ /* Consumer and feeder quota model impl.
+
+ Consumer thread only calls:
+ Acquire(), fetches tokens for usage from bucket;
+ Consume(), eats given amount of tokens, must not
+ be greater than Value() items;
+
+ Other threads (feeders) calls:
+ Return(), put used tokens back to bucket;
+ */
+
+ class TTokenQuota {
+ public:
+ TTokenQuota(bool enabled, size_t tokens, size_t wake)
+ : Enabled(tokens > 0 ? enabled : false)
+ , Acquired(0)
+ , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
+ , Tokens_(tokens)
{
Y_UNUSED(padd_);
}
-
+
bool Acquire(TAtomic level = 1, bool force = false) {
level = Max(TAtomicBase(level), TAtomicBase(1));
-
+
if (Enabled && (Acquired < level || force)) {
Acquired += AtomicSwap(&Tokens_, 0);
- }
-
- return !Enabled || Acquired >= level;
- }
-
+ }
+
+ return !Enabled || Acquired >= level;
+ }
+
void Consume(size_t items) {
if (Enabled) {
Y_ASSERT(Acquired >= TAtomicBase(items));
-
- Acquired -= items;
- }
- }
-
+
+ Acquired -= items;
+ }
+ }
+
bool Return(size_t items_) noexcept {
if (!Enabled || items_ == 0)
- return false;
-
- const TAtomic items = items_;
- const TAtomic value = AtomicAdd(Tokens_, items);
-
- return (value - items < WakeLev && value >= WakeLev);
- }
-
+ return false;
+
+ const TAtomic items = items_;
+ const TAtomic value = AtomicAdd(Tokens_, items);
+
+ return (value - items < WakeLev && value >= WakeLev);
+ }
+
bool IsEnabled() const noexcept {
- return Enabled;
- }
-
+ return Enabled;
+ }
+
bool IsAboveWake() const noexcept {
- return !Enabled || (WakeLev <= AtomicGet(Tokens_));
- }
-
+ return !Enabled || (WakeLev <= AtomicGet(Tokens_));
+ }
+
size_t Tokens() const noexcept {
- return Acquired + AtomicGet(Tokens_);
- }
-
+ return Acquired + AtomicGet(Tokens_);
+ }
+
size_t Check(const TAtomic level) const noexcept {
- return !Enabled || level <= Acquired;
- }
-
- private:
+ return !Enabled || level <= Acquired;
+ }
+
+ private:
bool Enabled;
TAtomicBase Acquired;
- const TAtomicBase WakeLev;
+ const TAtomicBase WakeLev;
TAtomic Tokens_;
-
- /* This padd requires for align Tokens_ member on its own
- CPU cacheline. */
-
+
+ /* This padd requires for align Tokens_ member on its own
+ CPU cacheline. */
+
ui64 padd_;
- };
-}
+ };
+}
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 730fc0f554..22932569db 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -48,11 +48,11 @@ namespace NBus {
const TInstant now = TInstant::Now();
WriterFillStatus();
-
+
GranStatus.Writer.Update(WriterData.Status, now, true);
GranStatus.Reader.Update(ReaderData.Status, now, true);
}
-
+
TRemoteConnection::~TRemoteConnection() {
Y_VERIFY(ReplyQueue.IsEmpty());
}
@@ -73,7 +73,7 @@ namespace NBus {
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
-
+
return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
}
@@ -83,13 +83,13 @@ namespace NBus {
Y_VERIFY(State == WRITER_FILLING, "state must be initial");
Channel = channel;
}
-
+
void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
Y_VERIFY(!Channel, "must not have channel");
Y_VERIFY(Buffer.Empty(), "buffer must be empty");
Channel = channel;
}
-
+
void TRemoteConnection::TWriterData::DropChannel() {
if (!!Channel) {
Channel->Unregister();
@@ -184,7 +184,7 @@ namespace NBus {
ReaderData.Status.Fd = INVALID_SOCKET;
return;
}
-
+
ReaderData.DropChannel();
ReaderData.Status.Fd = readSocket.Socket;
@@ -232,10 +232,10 @@ namespace NBus {
ReaderData.Status.Acts += 1;
ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
-
+
if (AtomicGet(ReaderData.Down)) {
ReaderData.DropChannel();
-
+
ReaderProcessStatusDown();
ReaderData.ShutdownComplete.Signal();
@@ -262,7 +262,7 @@ namespace NBus {
}
ReaderFlushMessages();
- }
+ }
ReaderSendStatus(now);
}
@@ -275,109 +275,109 @@ namespace NBus {
else if (!QuotaBytes.Acquire(bytes))
wakeFlags |= WAKE_QUOTA_BYTES;
-
+
if (wakeFlags) {
ReaderData.Status.QuotaExhausted++;
-
+
WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
}
-
+
return wakeFlags == 0;
}
-
+
void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
QuotaMsg.Consume(msg);
QuotaBytes.Consume(bytes);
}
-
+
void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes))
ReadQuotaWakeup();
}
-
+
void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
}
-
+
bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
bool rMsg = QuotaMsg.Return(items);
bool rBytes = QuotaBytes.Return(bytes);
-
+
return rMsg || rBytes;
}
-
+
void TRemoteConnection::ReadQuotaWakeup() {
const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
-
+
if (mask && mask == WriterData.AwakeFlags) {
WriterData.Status.ReaderWakeups++;
WriterData.AwakeFlags = 0;
-
+
ScheduleRead();
}
}
-
+
ui32 TRemoteConnection::WriteWakeFlags() const {
ui32 awakeFlags = 0;
-
+
if (QuotaMsg.IsAboveWake())
awakeFlags |= WAKE_QUOTA_MSG;
-
+
if (QuotaBytes.IsAboveWake())
awakeFlags |= WAKE_QUOTA_BYTES;
-
+
return awakeFlags;
}
-
+
bool TRemoteConnection::ReaderProcessBuffer() {
TInstant now = TInstant::Now();
-
+
for (;;) {
if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
break;
}
-
+
TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
-
+
if (header.Size < sizeof(TBusHeader)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (header.Size > Config.MaxMessageSize) {
LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
return false;
}
-
+
if (!ReaderData.HasBytesInBuf(header.Size)) {
if (ReaderData.Offset == 0) {
ReaderData.Buffer.Reserve(header.Size);
}
break;
}
-
+
if (!QuotaAcquire(1, header.Size))
return false;
-
+
if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
return false;
}
-
+
ReaderData.Offset += header.Size;
}
-
+
ReaderData.Buffer.ChopHead(ReaderData.Offset);
ReaderData.Offset = 0;
@@ -408,7 +408,7 @@ namespace NBus {
}
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
-
+
ssize_t bytes;
{
TWhatThreadDoesPushPop pp("recv syscall");
@@ -454,7 +454,7 @@ namespace NBus {
message != replyQueueTemp.rend(); ++message) {
messages.push_back(message->MessagePtr.Release());
}
-
+
WriterErrorMessages(messages, reason);
replyQueueTemp.clear();
@@ -535,10 +535,10 @@ namespace NBus {
ClearBeforeSendQueue(reasonForQueues);
WriterGetReconnectQueue()->Clear();
WriterGetWakeQueue()->Clear();
-
+
TMessagesPtrs cleared;
ClearOutgoingQueue(cleared, false);
-
+
if (!Session->IsSource_) {
for (auto& i : cleared) {
TBusMessagePtrAndHeader h(i);
@@ -548,10 +548,10 @@ namespace NBus {
// and this part is not batch
}
}
-
+
WriterErrorMessages(cleared, reason);
}
-
+
void TRemoteConnection::BeforeTryWrite() {
}
@@ -638,7 +638,7 @@ namespace NBus {
WriterData.Status.Incremental.NetworkOps += 1;
WriterData.Buffer.LeftProceed(bytes);
- }
+ }
WriterData.Buffer.Clear();
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
@@ -654,12 +654,12 @@ namespace NBus {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
} else {
ScheduleShutdown(status);
- }
+ }
}
void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
ShutdownReason = status;
-
+
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
@@ -856,7 +856,7 @@ namespace NBus {
}
TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
-
+
for (;;) {
THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
if (!writeMessage) {
@@ -944,12 +944,12 @@ namespace NBus {
WriterErrorMessage(h.MessagePtr.Release(), status);
}
}
-
+
void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
TBusMessage* released = m.Release();
WriterErrorMessages(MakeArrayRef(&released, 1), status);
}
-
+
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
@@ -958,17 +958,17 @@ namespace NBus {
Session->InvokeOnError(m, status);
}
}
-
+
void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
Y_VERIFY(Session->IsSource_, "state check");
TClientConnectionEvent event(type, ConnectionId, PeerAddr);
TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
session->ClientHandler->OnClientConnectionEvent(event);
}
-
+
bool TRemoteConnection::IsAlive() const {
return !AtomicGet(WriterData.Down);
}
-
+
}
}
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 5141a8ea9f..4538947368 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -13,8 +13,8 @@
#include "storage.h"
#include "vector_swaps.h"
#include "ybus.h"
-#include "misc/granup.h"
-#include "misc/tokenquota.h"
+#include "misc/granup.h"
+#include "misc/tokenquota.h"
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/executor.h>
@@ -49,7 +49,7 @@ namespace NBus {
struct TWriterToReaderSocketMessage {
TSocket Socket;
ui32 SocketVersion;
-
+
TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion)
: Socket(socket)
, SocketVersion(socketVersion)
@@ -154,13 +154,13 @@ namespace NBus {
virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0;
bool MessageRead(TArrayRef<const char> dataRef, TInstant now);
virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0;
-
+
void CallSerialize(TBusMessage* msg, TBuffer& buffer) const;
void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const;
TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const;
-
+
void ResetOneWayFlag(TArrayRef<TBusMessage*>);
-
+
inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() {
return this;
}
@@ -269,7 +269,7 @@ namespace NBus {
TGranUp<TRemoteConnectionWriterStatus> Writer;
TGranUp<TRemoteConnectionReaderStatus> Reader;
};
-
+
TWriterData WriterData;
TReaderData ReaderData;
TGranStatus GranStatus;
@@ -280,15 +280,15 @@ namespace NBus {
// client connection only
TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
-
+
EMessageStatus ShutdownReason;
};
inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
return PeerAddr;
}
-
+
typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
-
+
}
}
diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp
index 05ae84791c..2c48b2a287 100644
--- a/library/cpp/messagebus/remote_connection_status.cpp
+++ b/library/cpp/messagebus/remote_connection_status.cpp
@@ -180,15 +180,15 @@ TString TRemoteConnectionStatus::PrintToString() const {
p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls);
}
- p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
-
+ p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
+
if (Server) {
- p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
- p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
- p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
- p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
- } else {
- p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
+ p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
+ p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
+ p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
+ p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
+ } else {
+ p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
}
p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false));
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp
index 34dd2153e2..6abbf88a60 100644
--- a/library/cpp/messagebus/remote_server_session.cpp
+++ b/library/cpp/messagebus/remote_server_session.cpp
@@ -24,9 +24,9 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue,
{
if (config.PerConnectionMaxInFlightBySize > 0) {
if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize)
- ythrow yexception()
- << "too low PerConnectionMaxInFlightBySize value";
- }
+ ythrow yexception()
+ << "too low PerConnectionMaxInFlightBySize value";
+ }
}
namespace NBus {
@@ -87,7 +87,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) {
if (Y_UNLIKELY(AtomicGet(Down))) {
- ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get());
+ ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get());
InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN);
} else {
TWhatThreadDoesPushPop pp("OnMessage");
@@ -167,19 +167,19 @@ void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtr
void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) {
Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK));
- request->LocalFlags &= ~MESSAGE_IN_WORK;
+ request->LocalFlags &= ~MESSAGE_IN_WORK;
+
+ const size_t size = request->GetHeader()->Size;
- const size_t size = request->GetHeader()->Size;
-
- con.QuotaReturnAside(1, size);
- ServerOwnedMessages.ReleaseMultiple(1, size);
+ con.QuotaReturnAside(1, size);
+ ServerOwnedMessages.ReleaseMultiple(1, size);
}
void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) {
- ident.SetInWork(false);
- ident.Connection->QuotaReturnAside(1, ident.Size);
+ ident.SetInWork(false);
+ ident.Connection->QuotaReturnAside(1, ident.Size);
- ServerOwnedMessages.ReleaseMultiple(1, ident.Size);
+ ServerOwnedMessages.ReleaseMultiple(1, ident.Size);
}
void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) {
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index 7adaa1ae6d..ddf9f360c4 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -389,14 +389,14 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
acceptor != acceptors.end(); ++acceptor) {
- const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
-
- acceptorStatusSummary += status;
-
+ const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
+
+ acceptorStatusSummary += status;
+
if (acceptor != acceptors.begin()) {
ss << "\n";
}
- ss << status.PrintToString();
+ ss << status.PrintToString();
}
r.Acceptors = ss.Str();
@@ -410,19 +410,19 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
if (connection != connections.begin()) {
ss << "\n";
}
-
+
TRemoteConnectionStatus status;
status.Server = !IsSource_;
- status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
- status.WriterStatus = (*connection)->GranStatus.Writer.Get();
-
+ status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
+ status.WriterStatus = (*connection)->GranStatus.Writer.Get();
+
ss << status.PrintToString();
-
- r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
- r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
+
+ r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
+ r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
}
- r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
+ r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
r.Connections = ss.Str();
}
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index c11d447224..040f9b7702 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Sync.WaitForAndIncrement(3);
}
-
+
struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
TMutex OneLock;
-
- TOnMessageContext OneMessage;
-
- static TBusServerSessionConfig Config() {
- TBusServerSessionConfig config;
-
- config.PerConnectionMaxInFlight = 1;
- config.PerConnectionMaxInFlightBySize = 1500;
- config.MaxMessageSize = 1024;
-
- return config;
- }
-
- TServerForQuotaWake()
- : TExampleServer("TServerForQuotaWake", Config())
+
+ TOnMessageContext OneMessage;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+
+ config.PerConnectionMaxInFlight = 1;
+ config.PerConnectionMaxInFlightBySize = 1500;
+ config.MaxMessageSize = 1024;
+
+ return config;
+ }
+
+ TServerForQuotaWake()
+ : TExampleServer("TServerForQuotaWake", Config())
{
}
-
+
~TServerForQuotaWake() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessage(TOnMessageContext& req) override {
- if (!GoOn.Wait(0)) {
+ if (!GoOn.Wait(0)) {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!OneMessage);
-
- OneMessage.Swap(req);
- } else
- TExampleServer::OnMessage(req);
- }
-
- void WakeOne() {
+
+ UNIT_ASSERT(!OneMessage);
+
+ OneMessage.Swap(req);
+ } else
+ TExampleServer::OnMessage(req);
+ }
+
+ void WakeOne() {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!!OneMessage);
-
- TExampleServer::OnMessage(OneMessage);
-
- TOnMessageContext().Swap(OneMessage);
- }
- };
-
+
+ UNIT_ASSERT(!!OneMessage);
+
+ TExampleServer::OnMessage(OneMessage);
+
+ TOnMessageContext().Swap(OneMessage);
+ }
+ };
+
Y_UNIT_TEST(WakeReaderOnQuota) {
- const size_t test_msg_count = 64;
-
- TBusClientSessionConfig clientConfig;
-
- clientConfig.MaxInFlight = test_msg_count;
-
- TExampleClient client(clientConfig);
- TServerForQuotaWake server;
- TInstant start;
-
- client.MessageCount = test_msg_count;
-
- const NBus::TNetAddr addr = server.GetActualListenAddr();
-
- for (unsigned count = 0;;) {
- UNIT_ASSERT(count <= test_msg_count);
-
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
- EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
-
- if (status == MESSAGE_OK) {
- count++;
-
- } else if (status == MESSAGE_BUSY) {
+ const size_t test_msg_count = 64;
+
+ TBusClientSessionConfig clientConfig;
+
+ clientConfig.MaxInFlight = test_msg_count;
+
+ TExampleClient client(clientConfig);
+ TServerForQuotaWake server;
+ TInstant start;
+
+ client.MessageCount = test_msg_count;
+
+ const NBus::TNetAddr addr = server.GetActualListenAddr();
+
+ for (unsigned count = 0;;) {
+ UNIT_ASSERT(count <= test_msg_count);
+
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
+
+ if (status == MESSAGE_OK) {
+ count++;
+
+ } else if (status == MESSAGE_BUSY) {
if (count == test_msg_count) {
- TInstant now = TInstant::Now();
-
+ TInstant now = TInstant::Now();
+
if (start.GetValue() == 0) {
- start = now;
-
+ start = now;
+
// TODO: properly check that server is blocked
} else if (start + TDuration::MilliSeconds(100) < now) {
- break;
- }
- }
-
- Sleep(TDuration::MilliSeconds(10));
-
- } else
- UNIT_ASSERT(false);
- }
-
- server.GoOn.Signal();
- server.WakeOne();
-
- client.WaitReplies();
-
- server.WaitForOnMessageCount(test_msg_count);
+ break;
+ }
+ }
+
+ Sleep(TDuration::MilliSeconds(10));
+
+ } else
+ UNIT_ASSERT(false);
+ }
+
+ server.GoOn.Signal();
+ server.WakeOne();
+
+ client.WaitReplies();
+
+ server.WaitForOnMessageCount(test_msg_count);
};
Y_UNIT_TEST(TestConnectionAttempts) {
diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp
index e501cbf4a9..62ec241d85 100644
--- a/library/cpp/messagebus/www/www.cpp
+++ b/library/cpp/messagebus/www/www.cpp
@@ -200,12 +200,12 @@ struct TBusWww::TImpl {
Queues.Add(s->GetQueue());
}
- void RegisterQueue(TBusMessageQueuePtr q) {
+ void RegisterQueue(TBusMessageQueuePtr q) {
Y_VERIFY(!!q);
- TGuard<TMutex> g(Mutex);
- Queues.Add(q);
- }
-
+ TGuard<TMutex> g(Mutex);
+ Queues.Add(q);
+ }
+
void RegisterModule(TBusModule* module) {
Y_VERIFY(!!module);
TGuard<TMutex> g(Mutex);
@@ -824,10 +824,10 @@ void TBusWww::RegisterServerSession(TBusServerSessionPtr s) {
Impl->RegisterServerSession(s);
}
-void TBusWww::RegisterQueue(TBusMessageQueuePtr q) {
- Impl->RegisterQueue(q);
-}
-
+void TBusWww::RegisterQueue(TBusMessageQueuePtr q) {
+ Impl->RegisterQueue(q);
+}
+
void TBusWww::RegisterModule(TBusModule* module) {
Impl->RegisterModule(module);
}
diff --git a/library/cpp/monlib/messagebus/mon_service_messagebus.h b/library/cpp/monlib/messagebus/mon_service_messagebus.h
index ec1890e915..fe791e8a9b 100644
--- a/library/cpp/monlib/messagebus/mon_service_messagebus.h
+++ b/library/cpp/monlib/messagebus/mon_service_messagebus.h
@@ -42,5 +42,5 @@ namespace NMonitoring {
RegisterBusNgMonPage()->RegisterModule(module);
}
};
-
+
}