aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/actors
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
downloadydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h4
-rw-r--r--library/cpp/actors/core/actorsystem.h8
-rw-r--r--library/cpp/actors/core/event.cpp14
-rw-r--r--library/cpp/actors/core/event.h20
-rw-r--r--library/cpp/actors/core/event_load.h28
-rw-r--r--library/cpp/actors/core/event_local.h2
-rw-r--r--library/cpp/actors/core/event_pb.cpp46
-rw-r--r--library/cpp/actors/core/event_pb.h42
-rw-r--r--library/cpp/actors/core/events.h6
-rw-r--r--library/cpp/actors/core/executelater.h50
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp22
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h4
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp4
-rw-r--r--library/cpp/actors/core/executor_pool_united.h4
-rw-r--r--library/cpp/actors/core/executor_thread.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp6
-rw-r--r--library/cpp/actors/core/log.h22
-rw-r--r--library/cpp/actors/core/mailbox.cpp24
-rw-r--r--library/cpp/actors/core/mailbox.h80
-rw-r--r--library/cpp/actors/core/mon.h6
-rw-r--r--library/cpp/actors/core/mon_stats.h8
-rw-r--r--library/cpp/actors/core/ya.make6
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.cpp50
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.h42
-rw-r--r--library/cpp/actors/memory_log/memlog.cpp514
-rw-r--r--library/cpp/actors/memory_log/memlog.h382
-rw-r--r--library/cpp/actors/memory_log/mmap.cpp114
-rw-r--r--library/cpp/actors/memory_log/ya.make28
-rw-r--r--library/cpp/actors/prof/tag.cpp20
-rw-r--r--library/cpp/actors/prof/tag.h36
-rw-r--r--library/cpp/actors/prof/ut/tag_ut.cpp128
-rw-r--r--library/cpp/actors/prof/ut/ya.make14
-rw-r--r--library/cpp/actors/prof/ya.make22
-rw-r--r--library/cpp/actors/protos/actors.proto8
-rw-r--r--library/cpp/actors/protos/interconnect.proto56
-rw-r--r--library/cpp/actors/protos/services_common.proto4
-rw-r--r--library/cpp/actors/protos/unittests.proto24
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp30
-rw-r--r--library/cpp/actors/testlib/test_runtime.h2
-rw-r--r--library/cpp/actors/util/funnel_queue.h114
-rw-r--r--library/cpp/actors/util/recentwnd.h82
-rw-r--r--library/cpp/actors/util/thread.h16
43 files changed, 1048 insertions, 1048 deletions
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index a37887c939..e15bb86ce6 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -28,8 +28,8 @@ namespace NActors {
} else {
static_assert(dependent_false<TDerived>::value, "No correct Bootstrap() signature");
}
- }
-
+ }
+
TActorBootstrapped()
: TActor<TDerived>(&TDerived::StateBootstrap)
{}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 40499d7586..58d360edcc 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -129,7 +129,7 @@ namespace NActors {
virtual void SetRealTimeMode() const {}
};
-
+
// could be proxy to in-pool schedulers (for NUMA-aware executors)
class ISchedulerThread : TNonCopyable {
public:
@@ -352,7 +352,7 @@ namespace NActors {
NLog::TSettings* LoggerSettings() const {
return LoggerSettings0.Get();
}
-
+
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
void DeferPreStop(std::function<void()> fn) {
@@ -360,8 +360,8 @@ namespace NActors {
}
/* This is the base for memory profiling tags.
- System sets memory profiling tag for debug version of lfalloc.
- The tag is set as "base_tag + actor_activity_type". */
+ System sets memory profiling tag for debug version of lfalloc.
+ The tag is set as "base_tag + actor_activity_type". */
static ui32 MemProfActivityBase;
};
}
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 33f8ce2aaf..1c05ffc3fe 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -1,7 +1,7 @@
#include "event.h"
-#include "event_pb.h"
-
-namespace NActors {
+#include "event_pb.h"
+
+namespace NActors {
const TScopeId TScopeId::LocallyGenerated{
Max<ui64>(), Max<ui64>()
@@ -22,8 +22,8 @@ namespace NActors {
return chainBuf;
}
return new TEventSerializedData;
- }
-
+ }
+
TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
if (Buffer)
return Buffer;
@@ -34,5 +34,5 @@ namespace NActors {
return Buffer;
}
return new TEventSerializedData;
- }
-}
+ }
+}
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6ff02aaf94..081549071d 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -3,7 +3,7 @@
#include "defs.h"
#include "actorid.h"
#include "callstack.h"
-#include "event_load.h"
+#include "event_load.h"
#include <library/cpp/actors/wilson/wilson_trace.h>
@@ -17,13 +17,13 @@ namespace NActors {
public:
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
};
-
+
class IEventBase
: TNonCopyable,
public ISerializerToStream {
public:
// actual typing is performed by IEventHandle
-
+
virtual ~IEventBase() {
}
@@ -87,7 +87,7 @@ namespace NActors {
Buffer.Reset();
return x;
}
-
+
enum EFlags {
FlagTrackDelivery = 1 << 0,
FlagForwardOnNondelivery = 1 << 1,
@@ -236,7 +236,7 @@ namespace NActors {
, RewriteType(Type)
{
}
-
+
TIntrusivePtr<TEventSerializedData> GetChainBuffer();
TIntrusivePtr<TEventSerializedData> ReleaseChainBuffer();
@@ -248,15 +248,15 @@ namespace NActors {
} else {
return 0;
}
- }
+ }
bool HasBuffer() const {
return bool(Buffer);
- }
+ }
bool HasEvent() const {
return bool(Event);
- }
+ }
IEventBase* GetBase() {
if (!Event) {
@@ -326,7 +326,7 @@ namespace NActors {
} \
bool IsSerializable() const override { \
return false; \
- }
+ }
#define DEFINE_SIMPLE_NONLOCAL_EVENT(eventType, header) \
TString ToStringHeader() const override { \
@@ -340,5 +340,5 @@ namespace NActors {
} \
bool IsSerializable() const override { \
return true; \
- }
+ }
}
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 0dab1dd374..da2adc28ea 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -1,24 +1,24 @@
-#pragma once
+#pragma once
#include <util/stream/walk.h>
-#include <util/system/types.h>
+#include <util/system/types.h>
#include <util/generic/string.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/wilson/wilson_trace.h>
-
-namespace NActors {
+
+namespace NActors {
class IEventHandle;
-
+
struct TConstIoVec {
const void* Data;
size_t Size;
};
-
+
struct TIoVec {
void* Data;
size_t Size;
};
-
+
class TEventSerializedData
: public TThrRefBase
{
@@ -70,7 +70,7 @@ namespace NActors {
}
return result;
}
-
+
TRope EraseBack(size_t count) {
Y_VERIFY(count <= Rope.GetSize());
TRope::TIterator iter = Rope.End();
@@ -81,25 +81,25 @@ namespace NActors {
void Append(TRope&& from) {
Rope.Insert(Rope.End(), std::move(from));
}
-
+
void Append(TString buffer) {
if (buffer) {
Rope.Insert(Rope.End(), TRope(std::move(buffer)));
}
}
};
-}
-
+}
+
class TChainBufWalk : public IWalkInput {
TIntrusivePtr<NActors::TEventSerializedData> Buffer;
TRope::TConstIterator Iter;
-
+
public:
TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer)
: Buffer(std::move(buffer))
, Iter(Buffer->GetBeginIter())
{}
-
+
private:
size_t DoUnboundedNext(const void **ptr) override {
const size_t size = Iter.ContiguousSize();
@@ -108,5 +108,5 @@ private:
Iter.AdvanceToNextContiguousBlock();
}
return size;
- }
+ }
};
diff --git a/library/cpp/actors/core/event_local.h b/library/cpp/actors/core/event_local.h
index 2845aa94dd..2a4ff9fa55 100644
--- a/library/cpp/actors/core/event_local.h
+++ b/library/cpp/actors/core/event_local.h
@@ -2,7 +2,7 @@
#include "event.h"
#include "scheduler_cookie.h"
-#include "event_load.h"
+#include "event_load.h"
#include <util/system/type_name.h>
namespace NActors {
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index 018ff9ac34..bae0a0a64b 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -1,6 +1,6 @@
-#include "event_pb.h"
-
-namespace NActors {
+#include "event_pb.h"
+
+namespace NActors {
bool TRopeStream::Next(const void** data, int* size) {
*data = Iter.ContiguousData();
*size = Iter.ContiguousSize();
@@ -13,13 +13,13 @@ namespace NActors {
TotalByteCount += *size;
return *size != 0;
}
-
+
void TRopeStream::BackUp(int count) {
Y_VERIFY(count <= TotalByteCount);
Iter -= count;
TotalByteCount -= count;
}
-
+
bool TRopeStream::Skip(int count) {
if (static_cast<size_t>(TotalByteCount + count) > Size) {
count = Size - TotalByteCount;
@@ -27,20 +27,20 @@ namespace NActors {
Iter += count;
TotalByteCount += count;
return static_cast<size_t>(TotalByteCount) != Size;
- }
-
+ }
+
TCoroutineChunkSerializer::TCoroutineChunkSerializer()
: TotalSerializedDataSize(0)
, Stack(64 * 1024)
, SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
, InnerContext(SelfClosure)
{}
-
+
TCoroutineChunkSerializer::~TCoroutineChunkSerializer() {
CancelFlag = true;
Resume();
Y_VERIFY(Finished);
- }
+ }
bool TCoroutineChunkSerializer::AllowsAliasing() const {
return true;
@@ -85,10 +85,10 @@ namespace NActors {
} else {
InnerContext.SwitchTo(BufFeedContext);
}
- }
+ }
return true;
- }
-
+ }
+
bool TCoroutineChunkSerializer::Next(void** data, int* size) {
if (CancelFlag || AbortFlag) {
return false;
@@ -122,15 +122,15 @@ namespace NActors {
BufferPtr -= count;
SizeRemain += count;
TotalSerializedDataSize -= count;
- }
-
+ }
+
void TCoroutineChunkSerializer::Resume() {
TContMachineContext feedContext;
BufFeedContext = &feedContext;
feedContext.SwitchTo(&InnerContext);
BufFeedContext = nullptr;
- }
-
+ }
+
bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) {
for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) {
if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) {
@@ -156,14 +156,14 @@ namespace NActors {
return {Chunks, Chunks + NumChunks};
}
-
+
void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
Y_VERIFY(Event == nullptr);
Event = event;
TotalSerializedDataSize = 0;
AbortFlag = false;
}
-
+
void TCoroutineChunkSerializer::Abort() {
Y_VERIFY(Event);
AbortFlag = true;
@@ -181,8 +181,8 @@ namespace NActors {
}
Finished = true;
InnerContext.SwitchTo(BufFeedContext);
- }
-
+ }
+
bool TAllocChunkSerializer::Next(void** pdata, int* psize) {
if (Backup) {
// we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return
@@ -200,12 +200,12 @@ namespace NActors {
Buffers->Append(TRope(std::move(item)));
}
return true;
- }
-
+ }
+
void TAllocChunkSerializer::BackUp(int count) {
Backup.Insert(Backup.Begin(), Buffers->EraseBack(count));
}
-
+
bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) {
Y_VERIFY(false);
return false;
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d7546b901a..1c69d7e9bf 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -1,15 +1,15 @@
#pragma once
#include "event.h"
-#include "event_load.h"
-
+#include "event_load.h"
+
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/arena.h>
#include <library/cpp/actors/protos/actors.pb.h>
-#include <util/generic/deque.h>
-#include <util/system/context.h>
-#include <util/system/filemap.h>
-#include <array>
+#include <util/generic/deque.h>
+#include <util/system/context.h>
+#include <util/system/filemap.h>
+#include <array>
namespace NActors {
@@ -29,11 +29,11 @@ namespace NActors {
int64_t ByteCount() const override {
return TotalByteCount;
}
-
+
private:
int64_t TotalByteCount = 0;
};
-
+
class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
public:
TChunkSerializer() = default;
@@ -42,7 +42,7 @@ namespace NActors {
virtual bool WriteRope(const TRope *rope) = 0;
virtual bool WriteString(const TString *s) = 0;
};
-
+
class TAllocChunkSerializer final : public TChunkSerializer {
public:
bool Next(void** data, int* size) override;
@@ -51,7 +51,7 @@ namespace NActors {
return Buffers->GetSize();
}
bool WriteAliasedRaw(const void* data, int size) override;
-
+
// WARNING: these methods require owner to retain ownership and immutability of passed objects
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
@@ -62,19 +62,19 @@ namespace NActors {
}
return std::move(Buffers);
}
-
+
protected:
TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
TRope Backup;
};
-
+
class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
public:
using TChunk = std::pair<const char*, size_t>;
TCoroutineChunkSerializer();
~TCoroutineChunkSerializer();
-
+
void SetSerializingEvent(const IEventBase *event);
void Abort();
std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
@@ -87,7 +87,7 @@ namespace NActors {
const IEventBase *GetCurrentEvent() const {
return Event;
}
-
+
bool Next(void** data, int* size) override;
void BackUp(int count) override;
int64_t ByteCount() const override {
@@ -95,7 +95,7 @@ namespace NActors {
}
bool WriteAliasedRaw(const void* data, int size) override;
bool AllowsAliasing() const override;
-
+
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
@@ -103,7 +103,7 @@ namespace NActors {
void DoRun() override;
void Resume();
bool Produce(const void *data, size_t size);
-
+
i64 TotalSerializedDataSize;
TMappedAllocation Stack;
TContClosure SelfClosure;
@@ -120,7 +120,7 @@ namespace NActors {
bool SerializationSuccess;
bool Finished = false;
};
-
+
#ifdef ACTORLIB_HUGE_PB_SIZE
static const size_t EventMaxByteSize = 140 << 20; // (140MB)
#else
@@ -137,9 +137,9 @@ namespace NActors {
public:
using ProtoRecordType = TRecord;
-
+
TEventPBBase() = default;
-
+
explicit TEventPBBase(const TRecord& rec)
{
Record = rec;
@@ -153,7 +153,7 @@ namespace NActors {
TString ToStringHeader() const override {
return Record.GetTypeName();
}
-
+
TString ToString() const override {
return Record.ShortDebugString();
}
@@ -274,7 +274,7 @@ namespace NActors {
ev->CachedByteSize = input->GetSize();
return ev.Release();
}
-
+
size_t GetCachedByteSize() const {
if (CachedByteSize == 0) {
CachedByteSize = CalculateSerializedSize();
diff --git a/library/cpp/actors/core/events.h b/library/cpp/actors/core/events.h
index 702cf50fad..88103e888c 100644
--- a/library/cpp/actors/core/events.h
+++ b/library/cpp/actors/core/events.h
@@ -1,11 +1,11 @@
#pragma once
#include "event.h"
-#include "event_pb.h"
+#include "event_pb.h"
#include <library/cpp/actors/protos/actors.pb.h>
#include <util/system/unaligned_mem.h>
-
+
namespace NActors {
struct TEvents {
enum EEventSpace {
@@ -213,7 +213,7 @@ namespace NActors {
using TEvPoisonPill = TEvPoison; // Legacy name, deprecated
using TEvActorDied = TEvGone;
- };
+ };
}
template <>
diff --git a/library/cpp/actors/core/executelater.h b/library/cpp/actors/core/executelater.h
index e7a13c1005..53da592373 100644
--- a/library/cpp/actors/core/executelater.h
+++ b/library/cpp/actors/core/executelater.h
@@ -1,10 +1,10 @@
-#pragma once
-
-#include "actor_bootstrapped.h"
-
-#include <utility>
-
-namespace NActors {
+#pragma once
+
+#include "actor_bootstrapped.h"
+
+#include <utility>
+
+namespace NActors {
template <typename TCallback>
class TExecuteLater: public TActorBootstrapped<TExecuteLater<TCallback>> {
public:
@@ -13,10 +13,10 @@ namespace NActors {
}
TExecuteLater(
- TCallback&& callback,
- IActor::EActivityType activityType,
- ui32 channel = 0,
- ui64 cookie = 0,
+ TCallback&& callback,
+ IActor::EActivityType activityType,
+ ui32 channel = 0,
+ ui64 cookie = 0,
const TActorId& reportCompletionTo = TActorId(),
const TActorId& reportExceptionTo = TActorId()) noexcept
: Callback(std::move(callback))
@@ -27,16 +27,16 @@ namespace NActors {
{
this->SetActivityType(activityType);
}
-
+
void Bootstrap(const TActorContext& ctx) noexcept {
try {
{
/* RAII, Callback should be destroyed right before sending
- TEvCallbackCompletion */
-
+ TEvCallbackCompletion */
+
auto local = std::move(Callback);
using T = decltype(local);
-
+
if constexpr (std::is_invocable_v<T, const TActorContext&>) {
local(ctx);
} else {
@@ -56,11 +56,11 @@ namespace NActors {
new TEvents::TEvCallbackException(ctx.SelfID, msg),
Channel, Cookie);
}
- }
-
+ }
+
this->Die(ctx);
- }
-
+ }
+
private:
TCallback Callback;
const ui32 Channel;
@@ -68,13 +68,13 @@ namespace NActors {
const TActorId ReportCompletionTo;
const TActorId ReportExceptionTo;
};
-
+
template <typename T>
IActor* CreateExecuteLaterActor(
- T&& func,
- IActor::EActivityType activityType,
- ui32 channel = 0,
- ui64 cookie = 0,
+ T&& func,
+ IActor::EActivityType activityType,
+ ui32 channel = 0,
+ ui64 cookie = 0,
const TActorId& reportCompletionTo = TActorId(),
const TActorId& reportExceptionTo = TActorId()) noexcept {
return new TExecuteLater<T>(std::forward<T>(func),
@@ -84,4 +84,4 @@ namespace NActors {
reportCompletionTo,
reportExceptionTo);
}
-}
+}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 4dce16939a..3123e9b1a6 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -4,23 +4,23 @@
#include <library/cpp/actors/util/affinity.h>
#include <library/cpp/actors/util/datetime.h>
-#ifdef _linux_
+#ifdef _linux_
#include <pthread.h>
-#endif
-
+#endif
+
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX;
TBasicExecutorPool::TBasicExecutorPool(
- ui32 poolId,
- ui32 threads,
- ui64 spinThreshold,
+ ui32 poolId,
+ ui32 threads,
+ ui64 spinThreshold,
const TString& poolName,
TAffinity* affinity,
- TDuration timePerMailbox,
- ui32 eventsPerMailbox,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox,
int realtimePriority,
ui32 maxActivityType)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
@@ -330,10 +330,10 @@ namespace NActors {
if (pthread_setschedparam(threadSelf, SCHED_FIFO, &param)) {
Y_FAIL("Cannot set realtime priority");
}
- }
-#else
+ }
+#else
Y_UNUSED(RealtimePriority);
-#endif
+#endif
}
ui32 TBasicExecutorPool::GetThreadCount() const {
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 023190f7fe..65ceed2669 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -62,7 +62,7 @@ namespace NActors {
TAtomic ThreadUtilization;
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
-
+
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
@@ -81,7 +81,7 @@ namespace NActors {
ui32 maxActivityType = 1);
explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg);
~TBasicExecutorPool();
-
+
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index dac6245635..e5968609e7 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -14,7 +14,7 @@
#include <util/system/datetime.h>
#include <util/system/hp_timer.h>
-
+
#include <algorithm>
namespace NActors {
@@ -1315,7 +1315,7 @@ namespace NActors {
if (Y_UNLIKELY(result == CpuStopped) || TryAcquireToken(result)) {
break; // token acquired (or stop)
}
- }
+ }
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed());
return result;
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index a090ba2466..01be95b778 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -63,7 +63,7 @@ namespace NActors {
// Sets executor for specified pool
void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable);
-
+
// Add activation of newly scheduled mailbox and wake cpu to execute it if required
void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
@@ -72,7 +72,7 @@ namespace NActors {
// Try to wake idle cpu waiting for tokens on specified pool
void TryWake(TPoolId pool);
-
+
// Get activation from pool; requires pool's token
void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 446b651efd..ac97689f31 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -303,7 +303,7 @@ namespace NActors {
ExecutorPool->SetRealTimeMode();
TAffinityGuard affinity(ExecutorPool->Affinity());
-
+
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = hpnow;
ui64 execCount = 0;
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d3c573f0d..66b97bd351 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -45,7 +45,7 @@ namespace NActors {
void UnregisterActor(TMailboxHeader* mailbox, ui64 localActorId);
void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
-
+
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 5f63b5af58..bfac7d30e4 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -195,7 +195,7 @@ namespace NActors {
, Metrics(std::make_unique<TLoggerMetrics>(metrics))
{
}
-
+
TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
std::shared_ptr<TLogBackend> logBackend,
std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
@@ -260,8 +260,8 @@ namespace NActors {
break;
default:
break;
- }
-
+ }
+
}
void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c11a7cf3c1..514ff51c14 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -42,7 +42,7 @@
actorCtxOrSystem, priority, component, __VA_ARGS__); \
} \
} while (0) /**/
-
+
#define LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, stream) \
LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, "%s", [&]() { \
TStringBuilder logStringBuilder; \
@@ -304,7 +304,7 @@ namespace NActors {
/////////////////////////////////////////////////////////////////////
// Logging adaptors for memory log and logging into filesystem
/////////////////////////////////////////////////////////////////////
-
+
namespace NDetail {
inline void Y_PRINTF_FORMAT(2, 3) PrintfV(TString& dst, const char* format, ...) {
va_list params;
@@ -318,7 +318,7 @@ namespace NActors {
}
} // namespace NDetail
- template <typename TCtx>
+ template <typename TCtx>
inline void DeliverLogMessage(TCtx& ctx, NLog::EPriority mPriority, NLog::EComponent mComponent, TString &&str)
{
const NLog::TSettings *mSettings = ctx.LoggerSettings();
@@ -327,14 +327,14 @@ namespace NActors {
}
template <typename TCtx, typename... TArgs>
- inline void MemLogAdapter(
+ inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
NLog::EPriority mPriority,
NLog::EComponent mComponent,
const char* format, TArgs&&... params) {
TString Formatted;
-
-
+
+
if constexpr (sizeof... (params) > 0) {
NDetail::PrintfV(Formatted, format, std::forward<TArgs>(params)...);
} else {
@@ -343,9 +343,9 @@ namespace NActors {
MemLogWrite(Formatted.data(), Formatted.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(Formatted));
- }
-
- template <typename TCtx>
+ }
+
+ template <typename TCtx>
Y_WRAPPER inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
NLog::EPriority mPriority,
@@ -355,7 +355,7 @@ namespace NActors {
MemLogWrite(str.data(), str.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, TString(str));
}
-
+
template <typename TCtx>
Y_WRAPPER inline void MemLogAdapter(
TCtx& actorCtxOrSystem,
@@ -365,5 +365,5 @@ namespace NActors {
MemLogWrite(str.data(), str.size(), true);
DeliverLogMessage(actorCtxOrSystem, mPriority, mComponent, std::move(str));
- }
+ }
}
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index d84b4f9e46..ac598eff86 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -214,49 +214,49 @@ namespace NActors {
return true;
case TMailboxType::HTSwap: {
THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
case TMailboxType::ReadAsFilled: {
if (lineHint > TReadAsFilledMailbox::MaxMailboxesInLine())
return false;
-
+
TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
case TMailboxType::TinyReadAsFilled: {
if (lineHint > TTinyReadAsFilledMailbox::MaxMailboxesInLine())
return false;
-
+
TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x);
-#if (!defined(_tsan_enabled_))
+#if (!defined(_tsan_enabled_))
Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
-#endif
+#endif
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
executorPool->ScheduleActivation(hint);
}
- }
+ }
return true;
default:
Y_FAIL("unknown mailbox type");
- }
+ }
}
return false;
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 0bd9c4d314..8a2c0d0608 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -10,7 +10,7 @@
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
#include <util/generic/hash.h>
#include <util/system/hp_timer.h>
-#include <util/generic/ptr.h>
+#include <util/generic/ptr.h>
// TODO: clean all broken arcadia atomic stuff and replace with intrinsics
namespace NActors {
@@ -389,52 +389,52 @@ namespace NActors {
constexpr static ui32 AlignedSize() {
return ((sizeof(TRevolvingMailbox) + 63) / 64) * 64;
}
-
+
std::pair<ui32, ui32> CountRevolvingMailboxEvents(ui64 localActorId, ui32 maxTraverse);
bool CleanupEvents();
};
-
+
static_assert(sizeof(TRevolvingMailbox) == 128, "expect sizeof(TRevolvingMailbox) == 128");
-
+
struct THTSwapMailbox: public TMailboxHeader {
using TQueueType = NThreading::THTSwapQueue<IEventHandle*>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[16];
-
+
THTSwapMailbox()
: TMailboxHeader(TMailboxType::HTSwap)
, ScheduleMoment(0)
{
}
-
+
~THTSwapMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static THTSwapMailbox* Get(ui32 hint, void* line) {
return (THTSwapMailbox*)((ui8*)line + 64 + (hint - 1) * 64);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType = TMailboxType::HTSwap;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(THTSwapMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -442,50 +442,50 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(THTSwapMailbox) == 64,
"expect sizeof(THTSwapMailbox) == 64");
-
+
struct TReadAsFilledMailbox: public TMailboxHeader {
using TQueueType = NThreading::TReadAsFilledQueue<IEventHandle>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[8];
-
+
TReadAsFilledMailbox()
: TMailboxHeader(TMailboxType::ReadAsFilled)
, ScheduleMoment(0)
{
}
-
+
~TReadAsFilledMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static TReadAsFilledMailbox* Get(ui32 hint, void* line) {
return (TReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType =
TMailboxType::ReadAsFilled;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(TReadAsFilledMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -493,52 +493,52 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(TReadAsFilledMailbox) == 192,
"expect sizeof(TReadAsFilledMailbox) == 192");
-
+
struct TTinyReadAsFilledMailbox: public TMailboxHeader {
using TQueueType = NThreading::TReadAsFilledQueue<
IEventHandle,
NThreading::TRaFQueueBunchSize<4>>;
-
+
TQueueType Queue;
NHPTimer::STime ScheduleMoment;
char Padding_[8];
-
+
TTinyReadAsFilledMailbox()
: TMailboxHeader(TMailboxType::TinyReadAsFilled)
, ScheduleMoment(0)
{
}
-
+
~TTinyReadAsFilledMailbox() {
CleanupEvents();
}
-
+
IEventHandle* Pop() {
return Queue.Pop();
}
-
+
IEventHandle* Head() {
return Queue.Peek();
}
-
+
static TTinyReadAsFilledMailbox* Get(ui32 hint, void* line) {
return (TTinyReadAsFilledMailbox*)((ui8*)line + 64 + (hint - 1) * 192);
}
-
+
constexpr static ui64 MaxMailboxesInLine() {
return (LineSize - 64) / AlignedSize();
}
-
+
static const TMailboxType::EType MailboxType =
TMailboxType::TinyReadAsFilled;
-
+
constexpr static ui32 AlignedSize() {
return ((sizeof(TTinyReadAsFilledMailbox) + 63) / 64) * 64;
}
-
+
bool CleanupEvents() {
const bool done = (Queue.Peek() == nullptr);
while (IEventHandle* ev = Queue.Pop())
@@ -546,8 +546,8 @@ namespace NActors {
return done;
}
};
-
+
static_assert(sizeof(TTinyReadAsFilledMailbox) == 192,
"expect sizeof(TTinyReadAsFilledMailbox) == 192");
- };
+ };
}
diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h
index c450f2338e..3ebf6a0bed 100644
--- a/library/cpp/actors/core/mon.h
+++ b/library/cpp/actors/core/mon.h
@@ -123,7 +123,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfo(bufs->GetString());
}
@@ -160,7 +160,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfoRes(bufs->GetString());
}
};
@@ -192,7 +192,7 @@ namespace NActors {
return true;
}
- static IEventBase* Load(TEventSerializedData* bufs) {
+ static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteJsonInfoRes(bufs->GetString());
}
};
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index d55552af0c..f1d66664b6 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -13,17 +13,17 @@ namespace NActors {
inline void Add(ui64 val, ui64 inc = 1) {
size_t ind = 0;
-#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
+#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
asm volatile("" ::
: "memory");
-#endif
+#endif
if (val > 1) {
ind = GetValueBitCount(val - 1);
}
-#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
+#if defined(__clang__) && __clang_major__ == 3 && __clang_minor__ == 7
asm volatile("" ::
: "memory");
-#endif
+#endif
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc);
}
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index 880a9d00db..22155dbeec 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -32,8 +32,8 @@ SRCS(
ask.h
balancer.h
balancer.cpp
- buffer.cpp
- buffer.h
+ buffer.cpp
+ buffer.h
callstack.cpp
callstack.h
config.h
@@ -45,7 +45,7 @@ SRCS(
event.h
event_load.h
event_local.h
- event_pb.cpp
+ event_pb.cpp
event_pb.h
events.h
events_undelivered.cpp
diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp
index 649339ddb2..580956c92e 100644
--- a/library/cpp/actors/dnscachelib/dnscache.cpp
+++ b/library/cpp/actors/dnscachelib/dnscache.cpp
@@ -155,19 +155,19 @@ void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
}
bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept {
- time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6;
- time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6;
-
- if (TTimeKeeper::GetTime() - resolved < ctx->EntryLifetime)
- return false;
-
- if (TTimeKeeper::GetTime() - notfound < ctx->NegativeLifetime)
- return false;
-
- return true;
-}
-
-const TDnsCache::THost&
+ time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6;
+ time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6;
+
+ if (TTimeKeeper::GetTime() - resolved < ctx->EntryLifetime)
+ return false;
+
+ if (TTimeKeeper::GetTime() - notfound < ctx->NegativeLifetime)
+ return false;
+
+ return true;
+}
+
+const TDnsCache::THost&
TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
if (!ValidateHName(hostname)) {
LWPROBE(ResolveNullHost, hostname, family);
@@ -182,7 +182,7 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
TGuard<TMutex> lock(CacheMtx);
p = HostCache.find(hostname);
if (p != HostCache.end()) {
- if (!p->second.IsStale(family, this)) {
+ if (!p->second.IsStale(family, this)) {
/* Recently resolved, just return cached value */
ACacheHits += 1;
THost& host = p->second;
@@ -199,9 +199,9 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
ACacheMisses += 1;
}
- if (cacheOnly)
- return NullHost;
-
+ if (cacheOnly)
+ return NullHost;
+
TAtomic& inprogress = (family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6);
{
@@ -219,7 +219,7 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
ctx->Hostname = hostname;
ctx->Family = family;
- AtomicSet(inprogress, 1);
+ AtomicSet(inprogress, 1);
ares_gethostbyname(chan, hostname.c_str(), family,
&TDnsCache::GHBNCallback, ctx);
}
@@ -269,7 +269,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family)
ctx->Owner = this;
ctx->Addr = addr;
- AtomicSet(p->second.InProgress, 1);
+ AtomicSet(p->second.InProgress, 1);
ares_gethostbyaddr(chan, &addr,
family == AF_INET ? sizeof(in_addr) : sizeof(in6_addr),
family, &TDnsCache::GHBACallback, ctx);
@@ -284,7 +284,7 @@ const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family)
void TDnsCache::WaitTask(TAtomic& flag) {
const TInstant start = TInstant(TTimeKeeper::GetTimeval());
- while (AtomicGet(flag)) {
+ while (AtomicGet(flag)) {
ares_channel chan = static_cast<ares_channel>(Channel);
struct pollfd pfd[ARES_GETSOCK_MAXNUM];
@@ -380,7 +380,7 @@ void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) {
*/
p->second.ResolvedV4 = TTimeKeeper::GetTime();
p->second.ResolvedV4 = 0;
- AtomicSet(p->second.InProgressV4, 0);
+ AtomicSet(p->second.InProgressV4, 0);
} else if (info->h_addrtype == AF_INET6) {
p->second.AddrsV6.clear();
for (int i = 0; info->h_addr_list[i] != nullptr; i++) {
@@ -395,7 +395,7 @@ void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) {
notfound = TTimeKeeper::GetTime();
resolved = 0;
}
- AtomicSet(inprogress, 0);
+ AtomicSet(inprogress, 0);
}
void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) {
@@ -413,7 +413,7 @@ void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) {
p->second.NotFound = TTimeKeeper::GetTime();
p->second.Resolved = 0;
}
- AtomicSet(p->second.InProgress, 0);
+ AtomicSet(p->second.InProgress, 0);
}
TString TDnsCache::THost::AddrsV4ToString() const {
@@ -441,5 +441,5 @@ TString TDnsCache::THost::AddrsV6ToString() const {
}
return ss.Str();
}
-
-TDnsCache::TAresLibInit TDnsCache::InitAresLib;
+
+TDnsCache::TAresLibInit TDnsCache::InitAresLib;
diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h
index 3313a251a1..586957b9a0 100644
--- a/library/cpp/actors/dnscachelib/dnscache.h
+++ b/library/cpp/actors/dnscachelib/dnscache.h
@@ -1,6 +1,6 @@
#pragma once
-#include <contrib/libs/c-ares/ares.h>
+#include <contrib/libs/c-ares/ares.h>
#include <util/generic/map.h>
#include <util/generic/vector.h>
#include <util/network/address.h>
@@ -28,9 +28,9 @@ public:
/* use with AF_INET, AF_INET6 or AF_UNSPEC */
NAddr::IRemoteAddrPtr GetAddr(const TString& host,
- int family,
- TIpPort port = 0,
- bool cacheOnly = false);
+ int family,
+ TIpPort port = 0,
+ bool cacheOnly = false);
void GetAllAddresses(const TString& host, TVector<NAddr::IRemoteAddrPtr>&);
@@ -68,8 +68,8 @@ private:
TString AddrsV4ToString() const;
TString AddrsV6ToString() const;
-
- bool IsStale(int family, const TDnsCache* ctx) const noexcept;
+
+ bool IsStale(int family, const TDnsCache* ctx) const noexcept;
};
typedef TMap<TString, THost> THostCache;
@@ -99,9 +99,9 @@ private:
typedef TMap<in6_addr, TAddr, TAddrCmp> TAddrCache;
const THost& Resolve(const TString&, int family, bool cacheOnly = false);
-
+
const TAddr& ResolveAddr(const in6_addr&, int family);
-
+
void WaitTask(TAtomic&);
static void GHBNCallback(void* arg, int status, int timeouts,
@@ -128,21 +128,21 @@ private:
TMutex AresMtx;
void* Channel;
-
- struct TAresLibInit {
- TAresLibInit() {
+
+ struct TAresLibInit {
+ TAresLibInit() {
#ifdef _win_
- const auto res = ares_library_init(ARES_LIB_INIT_ALL);
- Y_VERIFY(res == 0);
+ const auto res = ares_library_init(ARES_LIB_INIT_ALL);
+ Y_VERIFY(res == 0);
#endif
- }
-
- ~TAresLibInit() {
+ }
+
+ ~TAresLibInit() {
#ifdef _win_
- ares_library_cleanup();
+ ares_library_cleanup();
#endif
- }
- };
-
- static TAresLibInit InitAresLib;
+ }
+ };
+
+ static TAresLibInit InitAresLib;
};
diff --git a/library/cpp/actors/memory_log/memlog.cpp b/library/cpp/actors/memory_log/memlog.cpp
index 8e6b46727d..f20162db70 100644
--- a/library/cpp/actors/memory_log/memlog.cpp
+++ b/library/cpp/actors/memory_log/memlog.cpp
@@ -1,28 +1,28 @@
-#include "memlog.h"
-
+#include "memlog.h"
+
#include <library/cpp/actors/util/datetime.h>
-#include <util/system/info.h>
-#include <util/system/atomic.h>
-#include <util/system/align.h>
-
-#include <contrib/libs/linuxvdso/interface.h>
-
-#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_)
-#define HAVE_VDSO_GETCPU 1
-#include <contrib/libs/linuxvdso/interface.h>
-static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused);
-#endif
-
-#if defined(_unix_)
+#include <util/system/info.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+
+#include <contrib/libs/linuxvdso/interface.h>
+
+#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_)
+#define HAVE_VDSO_GETCPU 1
+#include <contrib/libs/linuxvdso/interface.h>
+static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused);
+#endif
+
+#if defined(_unix_)
#include <sched.h>
-#elif defined(_win_)
+#elif defined(_win_)
#include <WinBase.h>
-#else
+#else
#error NO IMPLEMENTATION FOR THE PLATFORM
-#endif
-
-const char TMemoryLog::DEFAULT_LAST_MARK[16] = {
+#endif
+
+const char TMemoryLog::DEFAULT_LAST_MARK[16] = {
'c',
'b',
'7',
@@ -39,9 +39,9 @@ const char TMemoryLog::DEFAULT_LAST_MARK[16] = {
'4',
'5',
'\n',
-};
-
-const char TMemoryLog::CLEAR_MARK[16] = {
+};
+
+const char TMemoryLog::CLEAR_MARK[16] = {
' ',
' ',
' ',
@@ -58,146 +58,146 @@ const char TMemoryLog::CLEAR_MARK[16] = {
' ',
' ',
'\n',
-};
-
-unsigned TMemoryLog::GetSelfCpu() noexcept {
-#if defined(_unix_)
+};
+
+unsigned TMemoryLog::GetSelfCpu() noexcept {
+#if defined(_unix_)
#if HAVE_VDSO_GETCPU
- unsigned cpu;
- if (Y_LIKELY(FastGetCpu != nullptr)) {
- auto result = FastGetCpu(&cpu, nullptr, nullptr);
- Y_VERIFY(result == 0);
+ unsigned cpu;
+ if (Y_LIKELY(FastGetCpu != nullptr)) {
+ auto result = FastGetCpu(&cpu, nullptr, nullptr);
+ Y_VERIFY(result == 0);
return cpu;
- } else {
- return 0;
- }
-
+ } else {
+ return 0;
+ }
+
#elif defined(_x86_64_) || defined(_i386_)
-
+
#define CPUID(func, eax, ebx, ecx, edx) \
__asm__ __volatile__( \
"cpuid" \
: "=a"(eax), "=b"(ebx), "=c"(ecx), "=d"(edx) \
: "a"(func));
-
- int a = 0, b = 0, c = 0, d = 0;
- CPUID(0x1, a, b, c, d);
- int acpiID = (b >> 24);
- return acpiID;
-
+
+ int a = 0, b = 0, c = 0, d = 0;
+ CPUID(0x1, a, b, c, d);
+ int acpiID = (b >> 24);
+ return acpiID;
+
#elif defined(__CNUC__)
- return sched_getcpu();
+ return sched_getcpu();
#else
- return 0;
+ return 0;
#endif
-
-#elif defined(_win_)
- return GetCurrentProcessorNumber();
-#else
- return 0;
-#endif
-}
-
-TMemoryLog* TMemoryLog::MemLogBuffer = nullptr;
+
+#elif defined(_win_)
+ return GetCurrentProcessorNumber();
+#else
+ return 0;
+#endif
+}
+
+TMemoryLog* TMemoryLog::MemLogBuffer = nullptr;
Y_POD_THREAD(TThread::TId)
TMemoryLog::LogThreadId;
-char* TMemoryLog::LastMarkIsHere = nullptr;
-
-std::atomic<bool> TMemoryLog::PrintLastMark(true);
-
+char* TMemoryLog::LastMarkIsHere = nullptr;
+
+std::atomic<bool> TMemoryLog::PrintLastMark(true);
+
TMemoryLog::TMemoryLog(size_t totalSize, size_t grainSize)
: GrainSize(grainSize)
, FreeGrains(DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE * 2)
, Buf(totalSize)
-{
- Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0);
- NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE;
-
- for (size_t i = 0; i < NumberOfGrains; ++i) {
- new (GetGrain(i)) TGrain;
- }
-
- NumberOfCpus = NSystemInfo::NumberOfCpus();
- Y_VERIFY(NumberOfGrains > NumberOfCpus);
- ActiveGrains.Reset(new TGrain*[NumberOfCpus]);
- for (size_t i = 0; i < NumberOfCpus; ++i) {
- ActiveGrains[i] = GetGrain(i);
- }
-
- for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) {
- FreeGrains.StubbornPush(GetGrain(i));
- }
-
-#if HAVE_VDSO_GETCPU
- auto vdsoFunc = (decltype(FastGetCpu))
- NVdso::Function("__vdso_getcpu", "LINUX_2.6");
- AtomicSet(FastGetCpu, vdsoFunc);
-#endif
-}
-
-void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept {
- // alignment required by NoCacheMemcpy
- amount = AlignUp<size_t>(amount, MemcpyAlignment);
-
- for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) {
- auto myCpu = GetSelfCpu();
-
- TGrain* grain = AtomicGet(ActiveGrains[myCpu]);
-
- if (grain != nullptr) {
- auto mine = AtomicGetAndAdd(grain->WritePointer, amount);
- if (mine + amount <= GrainSize - sizeof(TGrain)) {
- return &grain->Data[mine];
- }
-
- if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) {
- continue;
- }
-
- FreeGrains.StubbornPush(grain);
- }
-
- grain = (TGrain*)FreeGrains.Pop();
-
- if (grain == nullptr) {
- return nullptr;
- }
-
- grain->WritePointer = 0;
-
- if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) {
- FreeGrains.StubbornPush(grain);
- continue;
- }
- }
-
- return nullptr;
-}
-
-void ClearAlignedTail(char* tail) noexcept {
- auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment);
- if (aligned > tail) {
- memset(tail, 0, aligned - tail);
- }
-}
-
-#if defined(_x86_64_) || defined(_i386_)
-#include <xmmintrin.h>
-// the main motivation is not poluting CPU cache
-NO_SANITIZE_THREAD
-void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
- while (size >= sizeof(__m128) * 2) {
- __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128)));
- __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128)));
- _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a);
- _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b);
-
- size -= sizeof(__m128) * 2;
- src += sizeof(__m128) * 2;
- dst += sizeof(__m128) * 2;
- }
- memcpy(dst, src, size);
-}
+{
+ Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0);
+ NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE;
+
+ for (size_t i = 0; i < NumberOfGrains; ++i) {
+ new (GetGrain(i)) TGrain;
+ }
+
+ NumberOfCpus = NSystemInfo::NumberOfCpus();
+ Y_VERIFY(NumberOfGrains > NumberOfCpus);
+ ActiveGrains.Reset(new TGrain*[NumberOfCpus]);
+ for (size_t i = 0; i < NumberOfCpus; ++i) {
+ ActiveGrains[i] = GetGrain(i);
+ }
+
+ for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) {
+ FreeGrains.StubbornPush(GetGrain(i));
+ }
+
+#if HAVE_VDSO_GETCPU
+ auto vdsoFunc = (decltype(FastGetCpu))
+ NVdso::Function("__vdso_getcpu", "LINUX_2.6");
+ AtomicSet(FastGetCpu, vdsoFunc);
+#endif
+}
+
+void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept {
+ // alignment required by NoCacheMemcpy
+ amount = AlignUp<size_t>(amount, MemcpyAlignment);
+
+ for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) {
+ auto myCpu = GetSelfCpu();
+
+ TGrain* grain = AtomicGet(ActiveGrains[myCpu]);
+
+ if (grain != nullptr) {
+ auto mine = AtomicGetAndAdd(grain->WritePointer, amount);
+ if (mine + amount <= GrainSize - sizeof(TGrain)) {
+ return &grain->Data[mine];
+ }
+
+ if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) {
+ continue;
+ }
+
+ FreeGrains.StubbornPush(grain);
+ }
+
+ grain = (TGrain*)FreeGrains.Pop();
+
+ if (grain == nullptr) {
+ return nullptr;
+ }
+
+ grain->WritePointer = 0;
+
+ if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) {
+ FreeGrains.StubbornPush(grain);
+ continue;
+ }
+ }
+
+ return nullptr;
+}
+
+void ClearAlignedTail(char* tail) noexcept {
+ auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment);
+ if (aligned > tail) {
+ memset(tail, 0, aligned - tail);
+ }
+}
+
+#if defined(_x86_64_) || defined(_i386_)
+#include <xmmintrin.h>
+// the main motivation is not poluting CPU cache
+NO_SANITIZE_THREAD
+void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
+ while (size >= sizeof(__m128) * 2) {
+ __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128)));
+ __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128)));
+ _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a);
+ _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b);
+
+ size -= sizeof(__m128) * 2;
+ src += sizeof(__m128) * 2;
+ dst += sizeof(__m128) * 2;
+ }
+ memcpy(dst, src, size);
+}
NO_SANITIZE_THREAD
void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
@@ -224,144 +224,144 @@ void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
}
}
-#endif
-
-NO_SANITIZE_THREAD
-char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept {
- bool lastMark =
- isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
- size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
-
- char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
- if (buffer == nullptr) {
- return nullptr;
- }
-
-#if defined(_x86_64_) || defined(_i386_)
- if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) {
- NoCacheMemcpy(buffer, begin, msgSize);
+#endif
+
+NO_SANITIZE_THREAD
+char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept {
+ bool lastMark =
+ isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return nullptr;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
+ if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) {
+ NoCacheMemcpy(buffer, begin, msgSize);
} else {
NoWCacheMemcpy(buffer, begin, msgSize);
}
#else
memcpy(buffer, begin, msgSize);
#endif
-
- if (lastMark) {
- TMemoryLog::ChangeLastMark(buffer + msgSize);
- }
-
- ClearAlignedTail(buffer + amount);
- return buffer;
-}
-
-NO_SANITIZE_THREAD
-bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept {
- bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
- size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
-
- // Let's construct prolog with timestamp and thread id
- auto threadId = TMemoryLog::GetTheadId();
-
- // alignment required by NoCacheMemcpy
- // check for format for snprintf
- constexpr size_t prologSize = 48;
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + msgSize);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return buffer;
+}
+
+NO_SANITIZE_THREAD
+bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept {
+ bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ // Let's construct prolog with timestamp and thread id
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
+ // check for format for snprintf
+ constexpr size_t prologSize = 48;
alignas(TMemoryLog::MemcpyAlignment) char prolog[prologSize + 1];
Y_VERIFY(AlignDown(&prolog, TMemoryLog::MemcpyAlignment) == &prolog);
-
- int snprintfResult = snprintf(prolog, prologSize + 1,
+
+ int snprintfResult = snprintf(prolog, prologSize + 1,
"TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId);
-
- if (snprintfResult < 0) {
- return false;
- }
- Y_VERIFY(snprintfResult == prologSize);
-
- amount += prologSize;
- if (addLF) {
- ++amount; // add 1 byte for \n at the end of the message
- }
-
- char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
- if (buffer == nullptr) {
- return false;
- }
-
-#if defined(_x86_64_) || defined(_i386_)
+
+ if (snprintfResult < 0) {
+ return false;
+ }
+ Y_VERIFY(snprintfResult == prologSize);
+
+ amount += prologSize;
+ if (addLF) {
+ ++amount; // add 1 byte for \n at the end of the message
+ }
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return false;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
// warning: copy prolog first to avoid corruption of the message
// by prolog tail
NoCacheMemcpy(buffer, prolog, prologSize);
if (AlignDown(begin + prologSize, TMemoryLog::MemcpyAlignment) == begin + prologSize) {
NoCacheMemcpy(buffer + prologSize, begin, msgSize);
- } else {
+ } else {
NoWCacheMemcpy(buffer + prologSize, begin, msgSize);
}
#else
memcpy(buffer, prolog, prologSize);
memcpy(buffer + prologSize, begin, msgSize);
#endif
-
- if (addLF) {
- buffer[prologSize + msgSize] = '\n';
- }
-
- if (lastMark) {
- TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF);
- }
-
- ClearAlignedTail(buffer + amount);
- return true;
-}
-
-NO_SANITIZE_THREAD
-void TMemoryLog::ChangeLastMark(char* buffer) noexcept {
- memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE);
- auto oldMark = AtomicSwap(&LastMarkIsHere, buffer);
- if (Y_LIKELY(oldMark != nullptr)) {
- memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE);
- }
- if (AtomicGet(LastMarkIsHere) != buffer) {
- memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE);
- AtomicBarrier();
- }
-}
-
-bool MemLogVPrintF(const char* format, va_list params) noexcept {
- auto logger = TMemoryLog::GetMemoryLogger();
- if (logger == nullptr) {
- return false;
- }
-
- auto threadId = TMemoryLog::GetTheadId();
-
- // alignment required by NoCacheMemcpy
+
+ if (addLF) {
+ buffer[prologSize + msgSize] = '\n';
+ }
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return true;
+}
+
+NO_SANITIZE_THREAD
+void TMemoryLog::ChangeLastMark(char* buffer) noexcept {
+ memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE);
+ auto oldMark = AtomicSwap(&LastMarkIsHere, buffer);
+ if (Y_LIKELY(oldMark != nullptr)) {
+ memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE);
+ }
+ if (AtomicGet(LastMarkIsHere) != buffer) {
+ memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE);
+ AtomicBarrier();
+ }
+}
+
+bool MemLogVPrintF(const char* format, va_list params) noexcept {
+ auto logger = TMemoryLog::GetMemoryLogger();
+ if (logger == nullptr) {
+ return false;
+ }
+
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
alignas(TMemoryLog::MemcpyAlignment) char buf[TMemoryLog::MAX_MESSAGE_SIZE];
Y_VERIFY(AlignDown(&buf, TMemoryLog::MemcpyAlignment) == &buf);
-
+
int prologSize = snprintf(buf,
TMemoryLog::MAX_MESSAGE_SIZE - 2,
"TS %020" PRIu64 " TI %020" PRIu64 " ",
GetCycleCountFast(),
threadId);
-
- if (Y_UNLIKELY(prologSize < 0)) {
- return false;
- }
- Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE);
-
- int add = vsnprintf(
+
+ if (Y_UNLIKELY(prologSize < 0)) {
+ return false;
+ }
+ Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
+ int add = vsnprintf(
&buf[prologSize],
- TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2,
- format, params);
-
- if (Y_UNLIKELY(add < 0)) {
- return false;
- }
- Y_VERIFY(add >= 0);
- auto totalSize = prologSize + add;
-
+ TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2,
+ format, params);
+
+ if (Y_UNLIKELY(add < 0)) {
+ return false;
+ }
+ Y_VERIFY(add >= 0);
+ auto totalSize = prologSize + add;
+
buf[totalSize++] = '\n';
- Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE);
-
+ Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
return BareMemLogWrite(buf, totalSize) != nullptr;
-}
+}
diff --git a/library/cpp/actors/memory_log/memlog.h b/library/cpp/actors/memory_log/memlog.h
index 2aa27272a6..fe66efc4fb 100644
--- a/library/cpp/actors/memory_log/memlog.h
+++ b/library/cpp/actors/memory_log/memlog.h
@@ -1,211 +1,211 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/threading/queue/mpmc_unordered_ring.h>
#include <util/generic/string.h>
-#include <util/string/printf.h>
-#include <util/system/datetime.h>
-#include <util/system/thread.h>
-#include <util/system/types.h>
-#include <util/system/atomic.h>
-#include <util/system/align.h>
-#include <util/system/tls.h>
-
-#include <atomic>
-#include <cstdio>
-
-#ifdef _win_
-#include <util/system/winint.h>
-#endif
-
-#ifndef NO_SANITIZE_THREAD
+#include <util/string/printf.h>
+#include <util/system/datetime.h>
+#include <util/system/thread.h>
+#include <util/system/types.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+#include <util/system/tls.h>
+
+#include <atomic>
+#include <cstdio>
+
+#ifdef _win_
+#include <util/system/winint.h>
+#endif
+
+#ifndef NO_SANITIZE_THREAD
#define NO_SANITIZE_THREAD
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#undef NO_SANITIZE_THREAD
#define NO_SANITIZE_THREAD __attribute__((no_sanitize_thread))
+#endif
#endif
#endif
-#endif
-
-class TMemoryLog {
-public:
- static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024;
- static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64;
- static constexpr size_t MAX_MESSAGE_SIZE = 1024;
- static constexpr ui16 MAX_GET_BUFFER_TRIES = 4;
- static constexpr ui16 MemcpyAlignment = 16;
-
- // search for cb7B68a8A561645
- static const char DEFAULT_LAST_MARK[16];
- static const char CLEAR_MARK[16];
-
- static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK);
-
- inline static TMemoryLog* GetMemoryLogger() noexcept {
- return AtomicGet(MemLogBuffer);
- }
-
+
+class TMemoryLog {
+public:
+ static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024;
+ static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64;
+ static constexpr size_t MAX_MESSAGE_SIZE = 1024;
+ static constexpr ui16 MAX_GET_BUFFER_TRIES = 4;
+ static constexpr ui16 MemcpyAlignment = 16;
+
+ // search for cb7B68a8A561645
+ static const char DEFAULT_LAST_MARK[16];
+ static const char CLEAR_MARK[16];
+
+ static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK);
+
+ inline static TMemoryLog* GetMemoryLogger() noexcept {
+ return AtomicGet(MemLogBuffer);
+ }
+
void* GetWriteBuffer(size_t amount) noexcept;
-
- inline static void* GetWriteBufferStatic(size_t amount) noexcept {
- auto logger = GetMemoryLogger();
- if (logger == nullptr) {
- return nullptr;
- }
- return logger->GetWriteBuffer(amount);
- }
-
- size_t GetGlobalBufferSize() const noexcept {
- return Buf.GetSize();
- }
-
- inline static void CreateMemoryLogBuffer(
+
+ inline static void* GetWriteBufferStatic(size_t amount) noexcept {
+ auto logger = GetMemoryLogger();
+ if (logger == nullptr) {
+ return nullptr;
+ }
+ return logger->GetWriteBuffer(amount);
+ }
+
+ size_t GetGlobalBufferSize() const noexcept {
+ return Buf.GetSize();
+ }
+
+ inline static void CreateMemoryLogBuffer(
size_t totalSize = DEFAULT_TOTAL_SIZE,
size_t grainSize = DEFAULT_GRAIN_SIZE)
Y_COLD {
- if (AtomicGet(MemLogBuffer) != nullptr) {
- return;
- }
-
- AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize));
- }
-
- static std::atomic<bool> PrintLastMark;
-
- // buffer must be at least 16 bytes
+ if (AtomicGet(MemLogBuffer) != nullptr) {
+ return;
+ }
+
+ AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize));
+ }
+
+ static std::atomic<bool> PrintLastMark;
+
+ // buffer must be at least 16 bytes
static void ChangeLastMark(char* buffer) noexcept;
-
- inline static TThread::TId GetTheadId() noexcept {
- if (LogThreadId == 0) {
- LogThreadId = TThread::CurrentThreadId();
- }
- return LogThreadId;
- }
-
-private:
+
+ inline static TThread::TId GetTheadId() noexcept {
+ if (LogThreadId == 0) {
+ LogThreadId = TThread::CurrentThreadId();
+ }
+ return LogThreadId;
+ }
+
+private:
TMemoryLog(size_t totalSize, size_t grainSize) Y_COLD;
-
- struct TGrain {
- TAtomic WritePointer = 0;
- char Padding[MemcpyAlignment - sizeof(TAtomic)];
- char Data[];
- };
-
- size_t NumberOfCpus;
- size_t GrainSize;
- size_t NumberOfGrains;
- TArrayPtr<TGrain*> ActiveGrains;
- NThreading::TMPMCUnorderedRing FreeGrains;
-
- TGrain* GetGrain(size_t grainIndex) const noexcept {
- return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex);
- }
-
- class TMMapArea {
- public:
- TMMapArea(size_t amount) Y_COLD {
- MMap(amount);
- }
-
- TMMapArea(const TMMapArea&) = delete;
- TMMapArea& operator=(const TMMapArea& copy) = delete;
-
- TMMapArea(TMMapArea&& move) Y_COLD {
- BufPtr = move.BufPtr;
- Size = move.Size;
-
- move.BufPtr = nullptr;
- move.Size = 0;
- }
-
- TMMapArea& operator=(TMMapArea&& move) Y_COLD {
- BufPtr = move.BufPtr;
- Size = move.Size;
-
- move.BufPtr = nullptr;
- move.Size = 0;
- return *this;
- }
-
- void Reset(size_t amount) Y_COLD {
- MUnmap();
- MMap(amount);
- }
-
- ~TMMapArea() noexcept Y_COLD {
- MUnmap();
- }
-
- size_t GetSize() const noexcept {
- return Size;
- }
-
- void* GetPtr() const noexcept {
- return BufPtr;
- }
-
- private:
- void* BufPtr;
- size_t Size;
-#ifdef _win_
- HANDLE Mapping;
-#endif
-
- void MMap(size_t amount);
- void MUnmap();
- };
-
- TMMapArea Buf;
-
- void* GetGlobalBuffer() const noexcept {
- return Buf.GetPtr();
- }
-
- static unsigned GetSelfCpu() noexcept;
-
- static TMemoryLog* MemLogBuffer;
- static Y_POD_THREAD(TThread::TId) LogThreadId;
- static char* LastMarkIsHere;
-};
-
-// it's no use of sanitizing this function
-NO_SANITIZE_THREAD
+
+ struct TGrain {
+ TAtomic WritePointer = 0;
+ char Padding[MemcpyAlignment - sizeof(TAtomic)];
+ char Data[];
+ };
+
+ size_t NumberOfCpus;
+ size_t GrainSize;
+ size_t NumberOfGrains;
+ TArrayPtr<TGrain*> ActiveGrains;
+ NThreading::TMPMCUnorderedRing FreeGrains;
+
+ TGrain* GetGrain(size_t grainIndex) const noexcept {
+ return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex);
+ }
+
+ class TMMapArea {
+ public:
+ TMMapArea(size_t amount) Y_COLD {
+ MMap(amount);
+ }
+
+ TMMapArea(const TMMapArea&) = delete;
+ TMMapArea& operator=(const TMMapArea& copy) = delete;
+
+ TMMapArea(TMMapArea&& move) Y_COLD {
+ BufPtr = move.BufPtr;
+ Size = move.Size;
+
+ move.BufPtr = nullptr;
+ move.Size = 0;
+ }
+
+ TMMapArea& operator=(TMMapArea&& move) Y_COLD {
+ BufPtr = move.BufPtr;
+ Size = move.Size;
+
+ move.BufPtr = nullptr;
+ move.Size = 0;
+ return *this;
+ }
+
+ void Reset(size_t amount) Y_COLD {
+ MUnmap();
+ MMap(amount);
+ }
+
+ ~TMMapArea() noexcept Y_COLD {
+ MUnmap();
+ }
+
+ size_t GetSize() const noexcept {
+ return Size;
+ }
+
+ void* GetPtr() const noexcept {
+ return BufPtr;
+ }
+
+ private:
+ void* BufPtr;
+ size_t Size;
+#ifdef _win_
+ HANDLE Mapping;
+#endif
+
+ void MMap(size_t amount);
+ void MUnmap();
+ };
+
+ TMMapArea Buf;
+
+ void* GetGlobalBuffer() const noexcept {
+ return Buf.GetPtr();
+ }
+
+ static unsigned GetSelfCpu() noexcept;
+
+ static TMemoryLog* MemLogBuffer;
+ static Y_POD_THREAD(TThread::TId) LogThreadId;
+ static char* LastMarkIsHere;
+};
+
+// it's no use of sanitizing this function
+NO_SANITIZE_THREAD
char* BareMemLogWrite(
- const char* begin, size_t msgSize, bool isLast = true) noexcept;
-
-// it's no use of sanitizing this function
-NO_SANITIZE_THREAD
+ const char* begin, size_t msgSize, bool isLast = true) noexcept;
+
+// it's no use of sanitizing this function
+NO_SANITIZE_THREAD
bool MemLogWrite(
- const char* begin, size_t msgSize, bool addLF = false) noexcept;
-
-Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept {
- if (end <= begin) {
- return false;
- }
-
- size_t msgSize = end - begin;
- return MemLogWrite(begin, msgSize);
-}
-
-template <typename TObj>
-bool MemLogWriteStruct(const TObj* obj) noexcept {
- auto begin = (const char*)(const void*)obj;
- return MemLogWrite(begin, begin + sizeof(TObj));
-}
-
+ const char* begin, size_t msgSize, bool addLF = false) noexcept;
+
+Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept {
+ if (end <= begin) {
+ return false;
+ }
+
+ size_t msgSize = end - begin;
+ return MemLogWrite(begin, msgSize);
+}
+
+template <typename TObj>
+bool MemLogWriteStruct(const TObj* obj) noexcept {
+ auto begin = (const char*)(const void*)obj;
+ return MemLogWrite(begin, begin + sizeof(TObj));
+}
+
Y_PRINTF_FORMAT(1, 0)
-bool MemLogVPrintF(const char* format, va_list params) noexcept;
-
+bool MemLogVPrintF(const char* format, va_list params) noexcept;
+
Y_PRINTF_FORMAT(1, 2)
Y_WRAPPER
-inline bool MemLogPrintF(const char* format, ...) noexcept {
- va_list params;
- va_start(params, format);
- auto result = MemLogVPrintF(format, params);
- va_end(params);
- return result;
-}
-
-Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept {
- return MemLogWrite(str, strlen(str));
-}
+inline bool MemLogPrintF(const char* format, ...) noexcept {
+ va_list params;
+ va_start(params, format);
+ auto result = MemLogVPrintF(format, params);
+ va_end(params);
+ return result;
+}
+
+Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept {
+ return MemLogWrite(str, strlen(str));
+}
diff --git a/library/cpp/actors/memory_log/mmap.cpp b/library/cpp/actors/memory_log/mmap.cpp
index 201998d343..b72feb1112 100644
--- a/library/cpp/actors/memory_log/mmap.cpp
+++ b/library/cpp/actors/memory_log/mmap.cpp
@@ -1,63 +1,63 @@
-#include "memlog.h"
-
+#include "memlog.h"
+
#if defined(_unix_)
#include <sys/mman.h>
#elif defined(_win_)
#include <util/system/winint.h>
-#else
+#else
#error NO IMPLEMENTATION FOR THE PLATFORM
-#endif
-
-void TMemoryLog::TMMapArea::MMap(size_t amount) {
- Y_VERIFY(amount > 0);
-
+#endif
+
+void TMemoryLog::TMMapArea::MMap(size_t amount) {
+ Y_VERIFY(amount > 0);
+
+#if defined(_unix_)
+ constexpr int mmapProt = PROT_READ | PROT_WRITE;
+#if defined(_linux_)
+ constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE;
+#else
+ constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON;
+#endif
+
+ BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0);
+ if (BufPtr == MAP_FAILED) {
+ throw std::bad_alloc();
+ }
+
+#elif defined(_win_)
+ Mapping = ::CreateFileMapping(
+ (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr);
+ if (Mapping == NULL) {
+ throw std::bad_alloc();
+ }
+ BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount);
+ if (BufPtr == NULL) {
+ throw std::bad_alloc();
+ }
+#endif
+
+ Size = amount;
+}
+
+void TMemoryLog::TMMapArea::MUnmap() {
+ if (BufPtr == nullptr) {
+ return;
+ }
+
#if defined(_unix_)
- constexpr int mmapProt = PROT_READ | PROT_WRITE;
-#if defined(_linux_)
- constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE;
-#else
- constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON;
-#endif
-
- BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0);
- if (BufPtr == MAP_FAILED) {
- throw std::bad_alloc();
- }
-
-#elif defined(_win_)
- Mapping = ::CreateFileMapping(
- (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr);
- if (Mapping == NULL) {
- throw std::bad_alloc();
- }
- BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount);
- if (BufPtr == NULL) {
- throw std::bad_alloc();
- }
-#endif
-
- Size = amount;
-}
-
-void TMemoryLog::TMMapArea::MUnmap() {
- if (BufPtr == nullptr) {
- return;
- }
-
-#if defined(_unix_)
- int result = ::munmap(BufPtr, Size);
- Y_VERIFY(result == 0);
-
-#elif defined(_win_)
- BOOL result = ::UnmapViewOfFile(BufPtr);
- Y_VERIFY(result != 0);
-
- result = ::CloseHandle(Mapping);
- Y_VERIFY(result != 0);
-
- Mapping = 0;
-#endif
-
- BufPtr = nullptr;
- Size = 0;
-}
+ int result = ::munmap(BufPtr, Size);
+ Y_VERIFY(result == 0);
+
+#elif defined(_win_)
+ BOOL result = ::UnmapViewOfFile(BufPtr);
+ Y_VERIFY(result != 0);
+
+ result = ::CloseHandle(Mapping);
+ Y_VERIFY(result != 0);
+
+ Mapping = 0;
+#endif
+
+ BufPtr = nullptr;
+ Size = 0;
+}
diff --git a/library/cpp/actors/memory_log/ya.make b/library/cpp/actors/memory_log/ya.make
index d89d5db4d7..441b51b3c7 100644
--- a/library/cpp/actors/memory_log/ya.make
+++ b/library/cpp/actors/memory_log/ya.make
@@ -1,19 +1,19 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
agri
g:kikimr
)
-
-SRCS(
- memlog.cpp
- memlog.h
- mmap.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ memlog.cpp
+ memlog.h
+ mmap.cpp
+)
+
+PEERDIR(
library/cpp/threading/queue
- contrib/libs/linuxvdso
-)
-
-END()
+ contrib/libs/linuxvdso
+)
+
+END()
diff --git a/library/cpp/actors/prof/tag.cpp b/library/cpp/actors/prof/tag.cpp
index 9ccf03e1a9..46b53d804f 100644
--- a/library/cpp/actors/prof/tag.cpp
+++ b/library/cpp/actors/prof/tag.cpp
@@ -1,6 +1,6 @@
-#include "tag.h"
+#include "tag.h"
#include "tcmalloc.h"
-
+
#include <library/cpp/charset/ci_string.h>
#include <library/cpp/containers/atomizer/atomizer.h>
#include <library/cpp/malloc/api/malloc.h>
@@ -13,9 +13,9 @@
#include <util/generic/singleton.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
-#include <util/system/mutex.h>
-
-namespace NProfiling {
+#include <util/system/mutex.h>
+
+namespace NProfiling {
class TStringAtoms {
private:
TMutex Mutex;
@@ -59,19 +59,19 @@ namespace NProfiling {
}
}
};
-
+
ui32 MakeTag(const char* s) {
return TStringAtoms::Instance().MakeTag(s);
}
-
+
ui32 MakeTags(const TVector<const char*>& ss) {
return TStringAtoms::Instance().MakeTags(ss);
}
-
+
const char* GetTag(ui32 tag) {
return TStringAtoms::Instance().GetTag(tag);
- }
-
+ }
+
size_t GetTagsCount() {
return TStringAtoms::Instance().GetTagsCount();
}
diff --git a/library/cpp/actors/prof/tag.h b/library/cpp/actors/prof/tag.h
index 357e264a22..ec4bed5b08 100644
--- a/library/cpp/actors/prof/tag.h
+++ b/library/cpp/actors/prof/tag.h
@@ -1,22 +1,22 @@
-#pragma once
-
+#pragma once
+
#include <util/generic/fwd.h>
-
-/*
- Common registry for tagging memory profiler.
- Register a new tag with MakeTag using a unique string.
+
+/*
+ Common registry for tagging memory profiler.
+ Register a new tag with MakeTag using a unique string.
Use registered tags with SetThreadAllocTag function in allocator API.
-*/
-
-namespace NProfiling {
+*/
+
+namespace NProfiling {
ui32 MakeTag(const char* s);
-
+
// Make only unique tags. Y_VERIFY inside.
ui32 MakeTags(const TVector<const char*>& ss);
-
+
const char* GetTag(ui32 tag);
size_t GetTagsCount();
-
+
using TSetThreadAllocTag = ui32(ui32 tag);
extern TSetThreadAllocTag* SetThreadAllocTag;
@@ -31,32 +31,32 @@ namespace NProfiling {
ui32 newTag = MakeTag(tagName);
RestoreTag = SetThreadAllocTag(newTag);
}
-
+
TMemoryTagScope(TMemoryTagScope&& move)
: RestoreTag(move.RestoreTag)
, Released(move.Released)
{
move.Released = true;
}
-
+
TMemoryTagScope& operator=(TMemoryTagScope&& move) {
RestoreTag = move.RestoreTag;
Released = move.Released;
move.Released = true;
return *this;
}
-
+
static void Reset(ui32 tag) {
SetThreadAllocTag(tag);
- }
-
+ }
+
void Release() {
if (!Released) {
SetThreadAllocTag(RestoreTag);
Released = true;
}
}
-
+
~TMemoryTagScope() {
if (!Released) {
SetThreadAllocTag(RestoreTag);
diff --git a/library/cpp/actors/prof/ut/tag_ut.cpp b/library/cpp/actors/prof/ut/tag_ut.cpp
index accf3921ab..43c56ecddc 100644
--- a/library/cpp/actors/prof/ut/tag_ut.cpp
+++ b/library/cpp/actors/prof/ut/tag_ut.cpp
@@ -1,68 +1,68 @@
-#include "tag.h"
-
+#include "tag.h"
+
#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NProfiling;
+
+class TAtomTagsTest: public TTestBase {
+private:
+ UNIT_TEST_SUITE(TAtomTagsTest);
+ UNIT_TEST(Test_MakeTag);
+ UNIT_TEST(Test_Make2Tags);
+ UNIT_TEST(Test_MakeTagTwice);
+
+ UNIT_TEST(Test_MakeAndGetTag);
+
+ UNIT_TEST(Test_MakeVector);
+ UNIT_TEST_SUITE_END();
-using namespace NProfiling;
-
-class TAtomTagsTest: public TTestBase {
-private:
- UNIT_TEST_SUITE(TAtomTagsTest);
- UNIT_TEST(Test_MakeTag);
- UNIT_TEST(Test_Make2Tags);
- UNIT_TEST(Test_MakeTagTwice);
-
- UNIT_TEST(Test_MakeAndGetTag);
-
- UNIT_TEST(Test_MakeVector);
- UNIT_TEST_SUITE_END();
-
-public:
- void Test_MakeTag();
- void Test_Make2Tags();
- void Test_MakeTagTwice();
- void Test_MakeAndGetTag();
- void Test_MakeVector();
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TAtomTagsTest);
-
-void TAtomTagsTest::Test_MakeTag() {
- ui32 tag = MakeTag("a tag");
- UNIT_ASSERT(tag != 0);
-}
-
-void TAtomTagsTest::Test_Make2Tags() {
- ui32 tag1 = MakeTag("a tag 1");
- ui32 tag2 = MakeTag("a tag 2");
- UNIT_ASSERT(tag1 != 0);
- UNIT_ASSERT(tag2 != 0);
- UNIT_ASSERT(tag1 != tag2);
-}
-
-void TAtomTagsTest::Test_MakeTagTwice() {
- ui32 tag1 = MakeTag("a tag twice");
- ui32 tag2 = MakeTag("a tag twice");
- UNIT_ASSERT(tag1 != 0);
- UNIT_ASSERT(tag1 == tag2);
-}
-
-void TAtomTagsTest::Test_MakeAndGetTag() {
- const char* makeStr = "tag to get";
- ui32 tag = MakeTag(makeStr);
- const char* tagStr = GetTag(tag);
- UNIT_ASSERT_STRINGS_EQUAL(makeStr, tagStr);
-}
-
-void TAtomTagsTest::Test_MakeVector() {
+public:
+ void Test_MakeTag();
+ void Test_Make2Tags();
+ void Test_MakeTagTwice();
+ void Test_MakeAndGetTag();
+ void Test_MakeVector();
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TAtomTagsTest);
+
+void TAtomTagsTest::Test_MakeTag() {
+ ui32 tag = MakeTag("a tag");
+ UNIT_ASSERT(tag != 0);
+}
+
+void TAtomTagsTest::Test_Make2Tags() {
+ ui32 tag1 = MakeTag("a tag 1");
+ ui32 tag2 = MakeTag("a tag 2");
+ UNIT_ASSERT(tag1 != 0);
+ UNIT_ASSERT(tag2 != 0);
+ UNIT_ASSERT(tag1 != tag2);
+}
+
+void TAtomTagsTest::Test_MakeTagTwice() {
+ ui32 tag1 = MakeTag("a tag twice");
+ ui32 tag2 = MakeTag("a tag twice");
+ UNIT_ASSERT(tag1 != 0);
+ UNIT_ASSERT(tag1 == tag2);
+}
+
+void TAtomTagsTest::Test_MakeAndGetTag() {
+ const char* makeStr = "tag to get";
+ ui32 tag = MakeTag(makeStr);
+ const char* tagStr = GetTag(tag);
+ UNIT_ASSERT_STRINGS_EQUAL(makeStr, tagStr);
+}
+
+void TAtomTagsTest::Test_MakeVector() {
TVector<const char*> strs = {
- "vector tag 0",
- "vector tag 1",
- "vector tag 3",
+ "vector tag 0",
+ "vector tag 1",
+ "vector tag 3",
"vector tag 4"};
- ui32 baseTag = MakeTags(strs);
- UNIT_ASSERT(baseTag != 0);
- for (ui32 i = 0; i < strs.size(); ++i) {
- const char* str = GetTag(baseTag + i);
- UNIT_ASSERT_STRINGS_EQUAL(str, strs[i]);
- }
-}
+ ui32 baseTag = MakeTags(strs);
+ UNIT_ASSERT(baseTag != 0);
+ for (ui32 i = 0; i < strs.size(); ++i) {
+ const char* str = GetTag(baseTag + i);
+ UNIT_ASSERT_STRINGS_EQUAL(str, strs[i]);
+ }
+}
diff --git a/library/cpp/actors/prof/ut/ya.make b/library/cpp/actors/prof/ut/ya.make
index 47c58a8fb7..d177fbdd22 100644
--- a/library/cpp/actors/prof/ut/ya.make
+++ b/library/cpp/actors/prof/ut/ya.make
@@ -1,12 +1,12 @@
UNITTEST_FOR(library/cpp/actors/prof)
-
+
OWNER(
agri
g:kikimr
)
-
-SRCS(
- tag_ut.cpp
-)
-
-END()
+
+SRCS(
+ tag_ut.cpp
+)
+
+END()
diff --git a/library/cpp/actors/prof/ya.make b/library/cpp/actors/prof/ya.make
index b5e2497563..cdd3e57d1f 100644
--- a/library/cpp/actors/prof/ya.make
+++ b/library/cpp/actors/prof/ya.make
@@ -1,19 +1,19 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
agri
g:kikimr
)
-
-SRCS(
- tag.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ tag.cpp
+)
+
+PEERDIR(
library/cpp/charset
library/cpp/containers/atomizer
-)
-
+)
+
IF (PROFILE_MEMORY_ALLOCATIONS)
CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS)
PEERDIR(
@@ -30,4 +30,4 @@ ELSE()
SRCS(tcmalloc_null.cpp)
ENDIF()
-END()
+END()
diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto
index 5fbd6d44ee..5e40cbf6c2 100644
--- a/library/cpp/actors/protos/actors.proto
+++ b/library/cpp/actors/protos/actors.proto
@@ -6,8 +6,8 @@ message TActorId {
required fixed64 RawX1 = 1;
required fixed64 RawX2 = 2;
}
-
-message TCallbackException {
+
+message TCallbackException {
required TActorId ActorId = 1;
- required string ExceptionMessage = 2;
-}
+ required string ExceptionMessage = 2;
+}
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 2e3b0d0d15..30a5c1bb74 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -14,7 +14,7 @@ message TEvNodeInfo {
optional string Address = 2;
optional uint32 Port = 3;
}
-
+
extend google.protobuf.FieldOptions {
optional string PrintName = 50376;
}
@@ -43,19 +43,19 @@ message TScopeId {
optional fixed64 X2 = 2;
}
-message THandshakeRequest {
- required uint64 Protocol = 1;
-
- required uint64 ProgramPID = 2;
- required uint64 ProgramStartTime = 3;
- required uint64 Serial = 4;
-
- required uint32 ReceiverNodeId = 5;
+message THandshakeRequest {
+ required uint64 Protocol = 1;
+
+ required uint64 ProgramPID = 2;
+ required uint64 ProgramStartTime = 3;
+ required uint64 Serial = 4;
+
+ required uint32 ReceiverNodeId = 5;
required string SenderActorId = 6;
-
- optional string SenderHostName = 7;
- optional string ReceiverHostName = 8;
- optional string UUID = 9;
+
+ optional string SenderHostName = 7;
+ optional string ReceiverHostName = 8;
+ optional string UUID = 9;
optional TClusterUUIDs ClusterUUIDs = 13;
optional bytes Ballast = 10;
@@ -72,15 +72,15 @@ message THandshakeRequest {
optional bool RequestModernFrame = 18;
optional bool RequestAuthOnly = 19;
-}
-
-message THandshakeSuccess {
- required uint64 Protocol = 1;
-
- required uint64 ProgramPID = 2;
- required uint64 ProgramStartTime = 3;
- required uint64 Serial = 4;
-
+}
+
+message THandshakeSuccess {
+ required uint64 Protocol = 1;
+
+ required uint64 ProgramPID = 2;
+ required uint64 ProgramStartTime = 3;
+ required uint64 Serial = 4;
+
required string SenderActorId = 5;
optional string VersionTag = 6;
@@ -94,13 +94,13 @@ message THandshakeSuccess {
optional bool UseModernFrame = 11;
optional bool AuthOnly = 12;
-}
-
-message THandshakeReply {
- optional THandshakeSuccess Success = 1;
- optional string ErrorExplaination = 2;
+}
+
+message THandshakeReply {
+ optional THandshakeSuccess Success = 1;
+ optional string ErrorExplaination = 2;
optional bool CookieCheckResult = 3;
-}
+}
message TEvLoadMessage {
message THop {
diff --git a/library/cpp/actors/protos/services_common.proto b/library/cpp/actors/protos/services_common.proto
index afa0ec0073..99347ad37e 100644
--- a/library/cpp/actors/protos/services_common.proto
+++ b/library/cpp/actors/protos/services_common.proto
@@ -7,8 +7,8 @@ enum EServiceCommon {
GLOBAL = 0;
INTERCONNECT = 1;
- TEST = 2;
- PROTOCOLS = 3;
+ TEST = 2;
+ PROTOCOLS = 3;
INTERCONNECT_SPEED_TEST = 4;
INTERCONNECT_STATUS = 5;
INTERCONNECT_NETWORK = 6;
diff --git a/library/cpp/actors/protos/unittests.proto b/library/cpp/actors/protos/unittests.proto
index a856b0942a..68b662b9b3 100644
--- a/library/cpp/actors/protos/unittests.proto
+++ b/library/cpp/actors/protos/unittests.proto
@@ -1,17 +1,17 @@
option cc_enable_arenas = true;
-message TSimple {
- required string Str1 = 1;
- optional string Str2 = 2;
- optional uint64 Number1 = 3;
-}
-
-message TBigMessage {
- repeated TSimple Simples = 1;
- repeated string ManyStr = 2;
- optional string OneMoreStr = 3;
- optional uint64 YANumber = 4;
-}
+message TSimple {
+ required string Str1 = 1;
+ optional string Str2 = 2;
+ optional uint64 Number1 = 3;
+}
+
+message TBigMessage {
+ repeated TSimple Simples = 1;
+ repeated string ManyStr = 2;
+ optional string OneMoreStr = 3;
+ optional uint64 YANumber = 4;
+}
message TMessageWithPayload {
optional string Meta = 1;
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 6fa25b9965..0459f76386 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -74,7 +74,7 @@ namespace NActors {
ActorSystem->Stop();
ActorSystem.Destroy();
- Poller.Reset();
+ Poller.Reset();
}
TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
@@ -909,17 +909,17 @@ namespace NActors {
case TMailboxType::Revolving:
UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
break;
- case TMailboxType::HTSwap:
+ case TMailboxType::HTSwap:
UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::ReadAsFilled:
+ break;
+ case TMailboxType::ReadAsFilled:
UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::TinyReadAsFilled:
+ break;
+ case TMailboxType::TinyReadAsFilled:
UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
+ break;
default:
- Y_FAIL("Unsupported mailbox type");
+ Y_FAIL("Unsupported mailbox type");
}
return actorId;
@@ -1645,13 +1645,13 @@ namespace NActors {
setup->LocalServices = node->LocalServices;
setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
const TActorId nameserviceId = GetNameserviceActorId();
-
- TIntrusivePtr<TInterconnectProxyCommon> common;
- common.Reset(new TInterconnectProxyCommon);
- common->NameserviceId = nameserviceId;
- common->MonCounters = interconnectCounters;
+
+ TIntrusivePtr<TInterconnectProxyCommon> common;
+ common.Reset(new TInterconnectProxyCommon);
+ common->NameserviceId = nameserviceId;
+ common->MonCounters = interconnectCounters;
common->TechnicalSelfHostName = "::1";
-
+
if (!UseRealThreads) {
common->Settings.DeadPeer = TDuration::Max();
common->Settings.CloseOnIdle = TDuration::Max();
@@ -1668,7 +1668,7 @@ namespace NActors {
continue;
const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
-
+
IActor *proxyActor = UseRealInterconnect
? new TInterconnectProxyTCP(peerNodeId, common)
: InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 26e3b45c98..cca5876645 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -556,7 +556,7 @@ namespace NActors {
TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
- TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
+ TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
volatile ui64* ActorSystemTimestamp;
volatile ui64* ActorSystemMonotonic;
TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices;
diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h
index 0e21e2617c..d760252054 100644
--- a/library/cpp/actors/util/funnel_queue.h
+++ b/library/cpp/actors/util/funnel_queue.h
@@ -91,62 +91,62 @@ protected:
delete entry;
return next;
}
-
-protected:
- struct TEntryIter {
- TEntry* ptr;
-
- ElementType& operator*() {
- return ptr->Data;
- }
-
- ElementType* operator->() {
- return &ptr->Data;
- }
-
- TEntryIter& operator++() {
- ptr = AtomicGet(ptr->Next);
- return *this;
- }
-
- bool operator!=(const TEntryIter& other) const {
- return ptr != other.ptr;
- }
-
- bool operator==(const TEntryIter& other) const {
- return ptr == other.ptr;
- }
- };
-
- struct TConstEntryIter {
- const TEntry* ptr;
-
- const ElementType& operator*() {
- return ptr->Data;
- }
-
- const ElementType* operator->() {
- return &ptr->Data;
- }
-
- TEntryIter& operator++() {
- ptr = AtomicGet(ptr->Next);
- return *this;
- }
-
- bool operator!=(const TConstEntryIter& other) const {
- return ptr != other.ptr;
- }
-
- bool operator==(const TConstEntryIter& other) const {
- return ptr == other.ptr;
- }
- };
-
-public:
- using const_iterator = TConstEntryIter;
- using iterator = TEntryIter;
-
+
+protected:
+ struct TEntryIter {
+ TEntry* ptr;
+
+ ElementType& operator*() {
+ return ptr->Data;
+ }
+
+ ElementType* operator->() {
+ return &ptr->Data;
+ }
+
+ TEntryIter& operator++() {
+ ptr = AtomicGet(ptr->Next);
+ return *this;
+ }
+
+ bool operator!=(const TEntryIter& other) const {
+ return ptr != other.ptr;
+ }
+
+ bool operator==(const TEntryIter& other) const {
+ return ptr == other.ptr;
+ }
+ };
+
+ struct TConstEntryIter {
+ const TEntry* ptr;
+
+ const ElementType& operator*() {
+ return ptr->Data;
+ }
+
+ const ElementType* operator->() {
+ return &ptr->Data;
+ }
+
+ TEntryIter& operator++() {
+ ptr = AtomicGet(ptr->Next);
+ return *this;
+ }
+
+ bool operator!=(const TConstEntryIter& other) const {
+ return ptr != other.ptr;
+ }
+
+ bool operator==(const TConstEntryIter& other) const {
+ return ptr == other.ptr;
+ }
+ };
+
+public:
+ using const_iterator = TConstEntryIter;
+ using iterator = TEntryIter;
+
iterator begin() {
return {AtomicGet(Front)};
}
@@ -156,7 +156,7 @@ public:
const_iterator begin() const {
return {AtomicGet(Front)};
}
-
+
iterator end() {
return {nullptr};
}
diff --git a/library/cpp/actors/util/recentwnd.h b/library/cpp/actors/util/recentwnd.h
index ba1ede6f29..29425301e4 100644
--- a/library/cpp/actors/util/recentwnd.h
+++ b/library/cpp/actors/util/recentwnd.h
@@ -1,28 +1,28 @@
-#pragma once
+#pragma once
-#include <util/generic/deque.h>
-
-template <typename TElem,
+#include <util/generic/deque.h>
+
+template <typename TElem,
template <typename, typename...> class TContainer = TDeque>
-class TRecentWnd {
-public:
+class TRecentWnd {
+public:
TRecentWnd(ui32 wndSize)
: MaxWndSize_(wndSize)
{
}
-
- void Push(const TElem& elem) {
- if (Window_.size() == MaxWndSize_)
- Window_.erase(Window_.begin());
- Window_.emplace_back(elem);
- }
-
- void Push(TElem&& elem) {
- if (Window_.size() == MaxWndSize_)
- Window_.erase(Window_.begin());
- Window_.emplace_back(std::move(elem));
- }
-
+
+ void Push(const TElem& elem) {
+ if (Window_.size() == MaxWndSize_)
+ Window_.erase(Window_.begin());
+ Window_.emplace_back(elem);
+ }
+
+ void Push(TElem&& elem) {
+ if (Window_.size() == MaxWndSize_)
+ Window_.erase(Window_.begin());
+ Window_.emplace_back(std::move(elem));
+ }
+
TElem& Last() {
return Window_.back();
}
@@ -35,33 +35,33 @@ public:
ui64 Size() const {
return Window_.size();
}
-
- using const_iterator = typename TContainer<TElem>::const_iterator;
-
+
+ using const_iterator = typename TContainer<TElem>::const_iterator;
+
const_iterator begin() {
return Window_.begin();
}
const_iterator end() {
return Window_.end();
}
+
+ void Reset(ui32 wndSize = 0) {
+ Window_.clear();
+ if (wndSize != 0) {
+ MaxWndSize_ = wndSize;
+ }
+ }
+
+ void ResetWnd(ui32 wndSize) {
+ Y_VERIFY(wndSize != 0);
+ MaxWndSize_ = wndSize;
+ if (Window_.size() > MaxWndSize_) {
+ Window_.erase(Window_.begin(),
+ Window_.begin() + Window_.size() - MaxWndSize_);
+ }
+ }
- void Reset(ui32 wndSize = 0) {
- Window_.clear();
- if (wndSize != 0) {
- MaxWndSize_ = wndSize;
- }
- }
-
- void ResetWnd(ui32 wndSize) {
- Y_VERIFY(wndSize != 0);
- MaxWndSize_ = wndSize;
- if (Window_.size() > MaxWndSize_) {
- Window_.erase(Window_.begin(),
- Window_.begin() + Window_.size() - MaxWndSize_);
- }
- }
-
-private:
- TContainer<TElem> Window_;
+private:
+ TContainer<TElem> Window_;
ui32 MaxWndSize_;
-};
+};
diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h
index d742c8c585..d90ab745fe 100644
--- a/library/cpp/actors/util/thread.h
+++ b/library/cpp/actors/util/thread.h
@@ -10,17 +10,17 @@
inline void SetCurrentThreadName(const TString& name,
const ui32 maxCharsFromProcessName = 8) {
#if defined(_linux_)
- // linux limits threadname by 15 + \0
-
- TStringBuf procName(GetExecPath());
- procName = procName.RNextTok('/');
- procName = procName.SubStr(0, maxCharsFromProcessName);
-
+ // linux limits threadname by 15 + \0
+
+ TStringBuf procName(GetExecPath());
+ procName = procName.RNextTok('/');
+ procName = procName.SubStr(0, maxCharsFromProcessName);
+
TStringStream linuxName;
- linuxName << procName << "." << name;
+ linuxName << procName << "." << name;
TThread::SetCurrentThreadName(linuxName.Str().data());
#else
- Y_UNUSED(maxCharsFromProcessName);
+ Y_UNUSED(maxCharsFromProcessName);
TThread::SetCurrentThreadName(name.data());
#endif
}