aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-02-10 16:46:34 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:34 +0300
commitad94e93a059747f4fc3d7add88d1a83daf40b733 (patch)
tree731d57e580bd143e1136e7747f13b26e6bac95d0 /library
parent298c6da79f1d8f35089a67f463f0b541bec36d9b (diff)
downloadydb-ad94e93a059747f4fc3d7add88d1a83daf40b733.tar.gz
Restoring authorship annotation for <vvvv@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actor.h26
-rw-r--r--library/cpp/actors/core/actor_ut.cpp10
-rw-r--r--library/cpp/actors/core/actorid.h12
-rw-r--r--library/cpp/actors/core/event.h4
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp26
-rw-r--r--library/cpp/actors/core/executor_thread.h4
-rw-r--r--library/cpp/actors/core/log.cpp92
-rw-r--r--library/cpp/actors/core/log.h2
-rw-r--r--library/cpp/actors/core/log_settings.cpp26
-rw-r--r--library/cpp/actors/core/log_settings.h2
-rw-r--r--library/cpp/actors/core/mon.h66
-rw-r--r--library/cpp/actors/core/scheduler_cookie.h12
-rw-r--r--library/cpp/actors/protos/actors.proto14
-rw-r--r--library/cpp/actors/protos/interconnect.proto24
-rw-r--r--library/cpp/actors/protos/services_common.proto16
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp1954
-rw-r--r--library/cpp/actors/testlib/test_runtime.h568
-rw-r--r--library/cpp/actors/testlib/ya.make4
-rw-r--r--library/cpp/actors/util/local_process_key.h112
-rw-r--r--library/cpp/actors/util/threadparkpad.cpp2
-rw-r--r--library/cpp/actors/util/ya.make2
-rw-r--r--library/cpp/monlib/dynamic_counters/counters.h20
-rw-r--r--library/cpp/random_provider/random_provider.cpp134
-rw-r--r--library/cpp/random_provider/random_provider.h28
-rw-r--r--library/cpp/random_provider/ya.make18
-rw-r--r--library/cpp/regex/pire/regexp.h8
-rw-r--r--library/cpp/testing/unittest/utmain.cpp6
-rw-r--r--library/cpp/time_provider/time_provider.cpp60
-rw-r--r--library/cpp/time_provider/time_provider.h22
-rw-r--r--library/cpp/time_provider/ya.make18
30 files changed, 1646 insertions, 1646 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index ed29bd14b9..7c91642087 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -3,7 +3,7 @@
#include "event.h"
#include "monotonic.h"
#include <util/system/tls.h>
-#include <library/cpp/actors/util/local_process_key.h>
+#include <library/cpp/actors/util/local_process_key.h>
namespace NActors {
class TActorSystem;
@@ -383,16 +383,16 @@ namespace NActors {
}
};
- struct TActorActivityTag {};
-
- inline size_t GetActivityTypeCount() {
- return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount();
- }
-
- inline TStringBuf GetActivityTypeName(size_t index) {
- return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
- }
-
+ struct TActorActivityTag {};
+
+ inline size_t GetActivityTypeCount() {
+ return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetCount();
+ }
+
+ inline TStringBuf GetActivityTypeName(size_t index) {
+ return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(index);
+ }
+
template <typename TDerived>
class TActor: public IActor {
private:
@@ -415,8 +415,8 @@ namespace NActors {
// static_cast<IActor::EActorActivity>(TDerived::ActorActivityType()));
//}
}
- }
-
+ }
+
protected:
//* Comment this function to find unmarked activities
static constexpr IActor::EActivityType ActorActivityType() {
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index e1b765ec72..ca0f4e5c00 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -513,7 +513,7 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
, Counter(counter)
{
}
-
+
bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
*Counter += 1;
if (ev->Type == TEvents::THelloWorld::Ping) {
@@ -527,16 +527,16 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
struct TTestActor : TActorBootstrapped<TTestActor> {
static constexpr char ActorName[] = "TestActor";
-
+
void Bootstrap()
{
const auto& activityTypeIndex = GetActivityType();
- Y_ENSURE(activityTypeIndex < GetActivityTypeCount());
- Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");
+ Y_ENSURE(activityTypeIndex < GetActivityTypeCount());
+ Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");
PassAway();
}
};
-
+
Y_UNIT_TEST(Basic) {
THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
setup->NodeId = 0;
diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h
index d972b1a0ff..4c9b04c181 100644
--- a/library/cpp/actors/core/actorid.h
+++ b/library/cpp/actors/core/actorid.h
@@ -2,7 +2,7 @@
#include "defs.h"
#include <util/stream/output.h> // for IOutputStream
-#include <util/generic/hash.h>
+#include <util/generic/hash.h>
namespace NActors {
// used as global uniq address of actor
@@ -187,10 +187,10 @@ template <>
inline void Out<NActors::TActorId>(IOutputStream& o, const NActors::TActorId& x) {
return x.Out(o);
}
-
-template <>
+
+template <>
struct THash<NActors::TActorId> {
inline ui64 operator()(const NActors::TActorId& x) const {
- return x.Hash();
- }
-};
+ return x.Hash();
+ }
+};
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6ff02aaf94..582582cfb0 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -268,14 +268,14 @@ namespace NActors {
return Event.Get();
}
-
+
TAutoPtr<IEventBase> ReleaseBase() {
TAutoPtr<IEventBase> x = GetBase();
Y_UNUSED(Event.Release());
Buffer.Reset();
return x;
}
-
+
TAutoPtr<IEventHandle> Forward(const TActorId& dest) {
if (Event)
return new IEventHandle(dest, Sender, Event.Release(), Flags, Cookie, nullptr, std::move(TraceId));
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index c3b9999168..62c8376d5e 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -70,39 +70,39 @@ namespace NActors {
// first step - find good enough mailbox
ui32 hint = 0;
TMailboxHeader* mailbox = nullptr;
-
+
if (revolvingWriteCounter == 0)
revolvingWriteCounter = AtomicIncrement(RegisterRevolvingCounter);
{
ui32 hintBackoff = 0;
-
+
while (hint == 0) {
hint = MailboxTable->AllocateMailbox(mailboxType, ++revolvingWriteCounter);
mailbox = MailboxTable->Get(hint);
-
+
if (!mailbox->LockFromFree()) {
MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingWriteCounter);
hintBackoff = hint;
hint = 0;
}
- }
+ }
MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingWriteCounter);
- }
-
+ }
+
const ui64 localActorId = AllocateID();
-
+
// ok, got mailbox
mailbox->AttachActor(localActorId, actor);
-
+
// do init
const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
DoActorInit(ActorSystem, actor, actorId, parentId);
-
+
// Once we unlock the mailbox the actor starts running and we cannot use the pointer any more
actor = nullptr;
-
+
switch (mailboxType) {
case TMailboxType::Simple:
UnlockFromExecution((TMailboxTable::TSimpleMailbox*)mailbox, this, false, hint, MaxWorkers, ++revolvingWriteCounter);
@@ -127,7 +127,7 @@ namespace NActors {
if (elapsed > 1000000) {
LWPROBE(SlowRegisterNew, PoolId, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
-
+
return actorId;
}
@@ -143,7 +143,7 @@ namespace NActors {
const ui64 localActorId = AllocateID();
mailbox->AttachActor(localActorId, actor);
-
+
const TActorId actorId(ActorSystem->NodeId, PoolId, localActorId, hint);
DoActorInit(ActorSystem, actor, actorId, parentId);
NHPTimer::STime elapsed = GetCycleCountFast() - hpstart;
@@ -157,7 +157,7 @@ namespace NActors {
TAffinity* TExecutorPoolBase::Affinity() const {
return ThreadsAffinity.Get();
}
-
+
bool TExecutorPoolBaseMailboxed::Cleanup() {
return MailboxTable->Cleanup();
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d3c573f0d..e5e2cff39a 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -108,5 +108,5 @@ namespace NActors {
}
executorPool->ReclaimMailbox(TMailbox::MailboxType, hint, workerId, ++revolvingWriteCounter);
}
- }
-}
+ }
+}
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 5f63b5af58..cf80598fe8 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -213,13 +213,13 @@ namespace NActors {
Metrics->IncDirectMsgs();
if (Settings && Settings->Satisfies(priority, component, 0ull)) {
va_list params;
- va_start(params, c);
+ va_start(params, c);
TString formatted;
vsprintf(formatted, c, params);
auto ok = OutputRecord(time, NLog::EPrio(priority), component, formatted);
Y_UNUSED(ok);
- va_end(params);
+ va_end(params);
}
}
@@ -334,11 +334,11 @@ namespace NActors {
}
TABLEBODY() {
for (EComponent i = Settings->MinVal; i < Settings->MaxVal; i++) {
- auto name = Settings->ComponentName(i);
- if (!*name)
- continue;
+ auto name = Settings->ComponentName(i);
+ if (!*name)
+ continue;
NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i);
-
+
TABLER() {
TABLED() {
str << "<a href='logger?c=" << i << "'>" << name << "</a>";
@@ -368,26 +368,26 @@ namespace NActors {
*/
void TLoggerActor::HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
const auto& params = ev->Get()->Request.GetParams();
- NLog::EComponent component = NLog::InvalidComponent;
- NLog::EPriority priority = NLog::PRI_DEBUG;
+ NLog::EComponent component = NLog::InvalidComponent;
+ NLog::EPriority priority = NLog::PRI_DEBUG;
NLog::EPriority samplingPriority = NLog::PRI_DEBUG;
ui32 samplingRate = 0;
- bool hasComponent = false;
- bool hasPriority = false;
+ bool hasComponent = false;
+ bool hasPriority = false;
bool hasSamplingPriority = false;
bool hasSamplingRate = false;
bool hasAllowDrop = false;
int allowDrop = 0;
- if (params.Has("c")) {
- if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) {
- hasComponent = true;
- if (params.Has("p")) {
- int rawPriority;
- if (TryFromString(params.Get("p"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) {
- priority = (NLog::EPriority)rawPriority;
- hasPriority = true;
- }
- }
+ if (params.Has("c")) {
+ if (TryFromString(params.Get("c"), component) && (component == NLog::InvalidComponent || Settings->IsValidComponent(component))) {
+ hasComponent = true;
+ if (params.Has("p")) {
+ int rawPriority;
+ if (TryFromString(params.Get("p"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) {
+ priority = (NLog::EPriority)rawPriority;
+ hasPriority = true;
+ }
+ }
if (params.Has("sp")) {
int rawPriority;
if (TryFromString(params.Get("sp"), rawPriority) && NLog::TSettings::IsValidPriority((NLog::EPriority)rawPriority)) {
@@ -400,14 +400,14 @@ namespace NActors {
hasSamplingRate = true;
}
}
- }
- }
+ }
+ }
if (params.Has("allowdrop")) {
if (TryFromString(params.Get("allowdrop"), allowDrop)) {
hasAllowDrop = true;
}
}
-
+
TStringStream str;
if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) {
NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component);
@@ -434,7 +434,7 @@ namespace NActors {
}
}
}
-
+
DIV_CLASS("row") {
DIV_CLASS("col-md-12") {
H4() {
@@ -473,12 +473,12 @@ namespace NActors {
}
}
}
-
- } else {
+
+ } else {
TString explanation;
- if (hasComponent && hasPriority) {
+ if (hasComponent && hasPriority) {
Settings->SetLevel(priority, component, explanation);
- }
+ }
if (hasComponent && hasSamplingPriority) {
Settings->SetSamplingLevel(samplingPriority, component, explanation);
}
@@ -488,7 +488,7 @@ namespace NActors {
if (hasAllowDrop) {
Settings->SetAllowDrop(allowDrop);
}
-
+
HTML(str) {
if (!explanation.empty()) {
DIV_CLASS("row") {
@@ -496,8 +496,8 @@ namespace NActors {
str << explanation;
}
}
- }
-
+ }
+
DIV_CLASS("row") {
DIV_CLASS("col-md-6") {
RenderComponentPriorities(str);
@@ -522,7 +522,7 @@ namespace NActors {
<< NLog::PriorityToString(NLog::EPrio(p)) << "</a>";
}
}
- }
+ }
}
}
H4() {
@@ -567,7 +567,7 @@ namespace NActors {
}
Metrics->GetOutputHtml(str);
}
- }
+ }
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}
@@ -669,10 +669,10 @@ namespace NActors {
}
class TStderrBackend: public TLogBackend {
- public:
+ public:
TStderrBackend() {
}
- void WriteData(const TLogRecord& rec) override {
+ void WriteData(const TLogRecord& rec) override {
#ifdef _MSC_VER
if (IsDebuggerPresent()) {
TString x;
@@ -693,15 +693,15 @@ namespace NActors {
Y_UNUSED(err);
}
} while (!isOk);
- }
-
+ }
+
void ReopenLog() override {
- }
-
- private:
+ }
+
+ private:
const TString Indent;
- };
-
+ };
+
class TLineFileLogBackend: public TFileLogBackend {
public:
TLineFileLogBackend(const TString& path)
@@ -735,10 +735,10 @@ namespace NActors {
TVector<TAutoPtr<TLogBackend>> UnderlyingBackends;
};
- TAutoPtr<TLogBackend> CreateStderrBackend() {
- return new TStderrBackend();
- }
-
+ TAutoPtr<TLogBackend> CreateStderrBackend() {
+ return new TStderrBackend();
+ }
+
TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName) {
return new TLineFileLogBackend(fileName);
}
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c11a7cf3c1..da518a2517 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -296,7 +296,7 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////
TAutoPtr<TLogBackend> CreateSysLogBackend(const TString& ident,
bool logPError, bool logCons);
- TAutoPtr<TLogBackend> CreateStderrBackend();
+ TAutoPtr<TLogBackend> CreateStderrBackend();
TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName);
TAutoPtr<TLogBackend> CreateNullBackend();
TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends);
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index f52f2fc5d2..58344b6a72 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -112,12 +112,12 @@ namespace NActors {
AtomicSet(ComponentInfo[i], settings.Raw.Data);
}
- TStringStream str;
+ TStringStream str;
str << titleName
<< " for all components has been changed to "
<< PriorityToString(EPrio(priority));
- explanation = str.Str();
+ explanation = str.Str();
return 0;
} else {
if (!IsValidComponent(component)) {
@@ -213,18 +213,18 @@ namespace NActors {
UseLocalTimestamps = value;
}
- EComponent TSettings::FindComponent(const TStringBuf& componentName) const {
+ EComponent TSettings::FindComponent(const TStringBuf& componentName) const {
if (componentName.empty())
- return InvalidComponent;
-
- for (EComponent component = MinVal; component <= MaxVal; ++component) {
- if (ComponentNames[component] == componentName)
- return component;
- }
-
- return InvalidComponent;
- }
-
+ return InvalidComponent;
+
+ for (EComponent component = MinVal; component <= MaxVal; ++component) {
+ if (ComponentNames[component] == componentName)
+ return component;
+ }
+
+ return InvalidComponent;
+ }
+
}
}
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 7fe4504edd..94ed257cd2 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -157,7 +157,7 @@ namespace NActors {
int SetLevel(EPriority priority, EComponent component, TString& explanation);
int SetSamplingLevel(EPriority priority, EComponent component, TString& explanation);
int SetSamplingRate(ui32 sampling, EComponent component, TString& explanation);
- EComponent FindComponent(const TStringBuf& componentName) const;
+ EComponent FindComponent(const TStringBuf& componentName) const;
static int PowerOf2Mask(int val);
static bool IsValidPriority(EPriority priority);
bool IsValidComponent(EComponent component);
diff --git a/library/cpp/actors/core/mon.h b/library/cpp/actors/core/mon.h
index c450f2338e..4987b46edf 100644
--- a/library/cpp/actors/core/mon.h
+++ b/library/cpp/actors/core/mon.h
@@ -10,8 +10,8 @@ namespace NActors {
enum {
HttpInfo = EventSpaceBegin(NActors::TEvents::ES_MON),
HttpInfoRes,
- RemoteHttpInfo,
- RemoteHttpInfoRes,
+ RemoteHttpInfo,
+ RemoteHttpInfoRes,
RemoteJsonInfoRes,
RemoteBinaryInfoRes,
End
@@ -82,12 +82,12 @@ namespace NActors {
struct TEvRemoteHttpInfo: public NActors::TEventBase<TEvRemoteHttpInfo, RemoteHttpInfo> {
TEvRemoteHttpInfo() {
}
-
+
TEvRemoteHttpInfo(const TString& query)
- : Query(query)
+ : Query(query)
{
}
-
+
TEvRemoteHttpInfo(const TString& query, HTTP_METHOD method)
: Query(query)
, Method(method)
@@ -96,25 +96,25 @@ namespace NActors {
TString Query;
HTTP_METHOD Method;
-
+
TString PathInfo() const {
- const size_t pos = Query.find('?');
+ const size_t pos = Query.find('?');
return (pos == TString::npos) ? TString() : Query.substr(0, pos);
- }
-
- TCgiParameters Cgi() const {
- const size_t pos = Query.find('?');
+ }
+
+ TCgiParameters Cgi() const {
+ const size_t pos = Query.find('?');
return TCgiParameters((pos == TString::npos) ? TString() : Query.substr(pos + 1));
- }
-
+ }
+
TString ToStringHeader() const override {
- return "TEvRemoteHttpInfo";
- }
-
+ return "TEvRemoteHttpInfo";
+ }
+
bool SerializeToArcadiaStream(TChunkSerializer *serializer) const override {
return serializer->WriteString(&Query);
- }
-
+ }
+
ui32 CalculateSerializedSize() const override {
return Query.size();
}
@@ -125,33 +125,33 @@ namespace NActors {
static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfo(bufs->GetString());
- }
+ }
HTTP_METHOD GetMethod() const
{
return Method;
}
- };
-
+ };
+
struct TEvRemoteHttpInfoRes: public NActors::TEventBase<TEvRemoteHttpInfoRes, RemoteHttpInfoRes> {
TEvRemoteHttpInfoRes() {
}
-
+
TEvRemoteHttpInfoRes(const TString& html)
- : Html(html)
+ : Html(html)
{
}
-
+
TString Html;
-
+
TString ToStringHeader() const override {
- return "TEvRemoteHttpInfoRes";
- }
-
+ return "TEvRemoteHttpInfoRes";
+ }
+
bool SerializeToArcadiaStream(TChunkSerializer *serializer) const override {
return serializer->WriteString(&Html);
- }
-
+ }
+
ui32 CalculateSerializedSize() const override {
return Html.size();
}
@@ -162,9 +162,9 @@ namespace NActors {
static IEventBase* Load(TEventSerializedData* bufs) {
return new TEvRemoteHttpInfoRes(bufs->GetString());
- }
- };
-
+ }
+ };
+
struct TEvRemoteJsonInfoRes: public NActors::TEventBase<TEvRemoteJsonInfoRes, RemoteJsonInfoRes> {
TEvRemoteJsonInfoRes() {
}
diff --git a/library/cpp/actors/core/scheduler_cookie.h b/library/cpp/actors/core/scheduler_cookie.h
index 2c20ca67f3..dc6933c7a2 100644
--- a/library/cpp/actors/core/scheduler_cookie.h
+++ b/library/cpp/actors/core/scheduler_cookie.h
@@ -44,12 +44,12 @@ namespace NActors {
return Cookie;
}
- ISchedulerCookie* Release() {
- ISchedulerCookie* result = Cookie;
- Cookie = nullptr;
- return result;
- }
-
+ ISchedulerCookie* Release() {
+ ISchedulerCookie* result = Cookie;
+ Cookie = nullptr;
+ return result;
+ }
+
void Reset(ISchedulerCookie* cookie) {
Detach();
Cookie = cookie;
diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto
index 5fbd6d44ee..5b18e8a53f 100644
--- a/library/cpp/actors/protos/actors.proto
+++ b/library/cpp/actors/protos/actors.proto
@@ -1,11 +1,11 @@
-package NActorsProto;
-option java_package = "ru.yandex.kikimr.proto";
-option java_outer_classname = "NActorsBaseProto";
-
+package NActorsProto;
+option java_package = "ru.yandex.kikimr.proto";
+option java_outer_classname = "NActorsBaseProto";
+
message TActorId {
- required fixed64 RawX1 = 1;
- required fixed64 RawX2 = 2;
-}
+ required fixed64 RawX1 = 1;
+ required fixed64 RawX2 = 2;
+}
message TCallbackException {
required TActorId ActorId = 1;
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 2e3b0d0d15..6513a59259 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -1,19 +1,19 @@
import "library/cpp/actors/protos/actors.proto";
import "google/protobuf/descriptor.proto";
-package NActorsInterconnect;
-option java_package = "ru.yandex.kikimr.proto";
-
-message TEvResolveNode {
- optional uint32 NodeId = 1;
+package NActorsInterconnect;
+option java_package = "ru.yandex.kikimr.proto";
+
+message TEvResolveNode {
+ optional uint32 NodeId = 1;
optional uint64 Deadline = 2;
-}
-
-message TEvNodeInfo {
- optional uint32 NodeId = 1;
- optional string Address = 2;
- optional uint32 Port = 3;
-}
+}
+
+message TEvNodeInfo {
+ optional uint32 NodeId = 1;
+ optional string Address = 2;
+ optional uint32 Port = 3;
+}
extend google.protobuf.FieldOptions {
optional string PrintName = 50376;
diff --git a/library/cpp/actors/protos/services_common.proto b/library/cpp/actors/protos/services_common.proto
index afa0ec0073..6b822a5e22 100644
--- a/library/cpp/actors/protos/services_common.proto
+++ b/library/cpp/actors/protos/services_common.proto
@@ -1,12 +1,12 @@
-package NActorsServices;
-option java_package = "ru.yandex.kikimr.proto";
-
-// 0-255 range
-enum EServiceCommon {
+package NActorsServices;
+option java_package = "ru.yandex.kikimr.proto";
+
+// 0-255 range
+enum EServiceCommon {
// WARN: This must be the smallest value in the enumeration
- GLOBAL = 0;
- INTERCONNECT = 1;
+ GLOBAL = 0;
+ INTERCONNECT = 1;
TEST = 2;
PROTOCOLS = 3;
INTERCONNECT_SPEED_TEST = 4;
@@ -18,4 +18,4 @@ enum EServiceCommon {
// This value is reserved boundary. Is must not be aliased with any values
// TODO: use reseved values upon protobuf update
// COMMON_END = 256;
-};
+};
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 6fa25b9965..e39b7bc59b 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -8,20 +8,20 @@
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
-#include <util/generic/maybe.h>
+#include <util/generic/maybe.h>
#include <util/generic/bt_exception.h>
-#include <util/random/mersenne.h>
-#include <util/string/printf.h>
-#include <typeinfo>
-
+#include <util/random/mersenne.h>
+#include <util/string/printf.h>
+#include <typeinfo>
+
bool VERBOSE = false;
-const bool PRINT_EVENT_BODY = false;
-
+const bool PRINT_EVENT_BODY = false;
+
namespace {
TString MakeClusterId() {
@@ -32,9 +32,9 @@ namespace {
}
}
-namespace NActors {
- ui64 TScheduledEventQueueItem::NextUniqueId = 0;
-
+namespace NActors {
+ ui64 TScheduledEventQueueItem::NextUniqueId = 0;
+
void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
<< ", from " << ev->Sender.LocalId();
@@ -46,21 +46,21 @@ namespace NActors {
if (!name.empty())
Cerr << " \"" << name << "\"";
Cerr << ", ";
- if (ev->HasEvent())
+ if (ev->HasEvent())
Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
- else if (ev->HasBuffer())
+ else if (ev->HasBuffer())
Cerr << " : BUFFER";
- else
+ else
Cerr << " : EMPTY";
-
+
Cerr << "\n";
- }
-
+ }
+
TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
- ActorSystemTimestamp = nullptr;
+ ActorSystemTimestamp = nullptr;
ActorSystemMonotonic = nullptr;
- }
-
+ }
+
void TTestActorRuntimeBase::TNodeDataBase::Stop() {
if (Poller)
Poller->Stop();
@@ -70,146 +70,146 @@ namespace NActors {
Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
}
- if (ActorSystem)
- ActorSystem->Stop();
+ if (ActorSystem)
+ ActorSystem->Stop();
- ActorSystem.Destroy();
+ ActorSystem.Destroy();
Poller.Reset();
- }
-
+ }
+
TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
Stop();
}
class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
- public:
+ public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
TEdgeActor(TTestActorRuntimeBase* runtime)
- : TActor(&TEdgeActor::StateFunc)
- , Runtime(runtime)
- {
- }
-
- STFUNC(StateFunc) {
+ : TActor(&TEdgeActor::StateFunc)
+ , Runtime(runtime)
+ {
+ }
+
+ STFUNC(StateFunc) {
Y_UNUSED(ctx);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
-
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
Cerr << "Event was added to sent queue\n";
- }
- else {
- if (verbose)
+ }
+ else {
+ if (verbose)
Cerr << "Event was dropped\n";
- }
- }
-
- private:
+ }
+ }
+
+ private:
TTestActorRuntimeBase* Runtime;
- };
-
- void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
- IEventHandle* ptr = ev.Get();
+ };
+
+ void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
+ IEventHandle* ptr = ev.Get();
Y_VERIFY(ptr);
-#ifdef DEBUG_ORDER_EVENTS
- ui64 counter = NextToSend++;
- TrackSent[ptr] = counter;
-#endif
- Sent.push_back(ev);
- }
-
- TAutoPtr<IEventHandle> TEventMailBox::Pop() {
- TAutoPtr<IEventHandle> result = Sent.front();
- Sent.pop_front();
-#ifdef DEBUG_ORDER_EVENTS
- auto it = TrackSent.find(result.Get());
- if (it != TrackSent.end()) {
+#ifdef DEBUG_ORDER_EVENTS
+ ui64 counter = NextToSend++;
+ TrackSent[ptr] = counter;
+#endif
+ Sent.push_back(ev);
+ }
+
+ TAutoPtr<IEventHandle> TEventMailBox::Pop() {
+ TAutoPtr<IEventHandle> result = Sent.front();
+ Sent.pop_front();
+#ifdef DEBUG_ORDER_EVENTS
+ auto it = TrackSent.find(result.Get());
+ if (it != TrackSent.end()) {
Y_VERIFY(ExpectedReceive == it->second);
- TrackSent.erase(result.Get());
- ++ExpectedReceive;
- }
-#endif
- return result;
- }
-
- bool TEventMailBox::IsEmpty() const {
- return Sent.empty();
- }
-
- void TEventMailBox::Capture(TEventsList& evList) {
- evList.insert(evList.end(), Sent.begin(), Sent.end());
- Sent.clear();
- }
-
- void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
- Sent.push_front(ev);
- }
-
- void TEventMailBox::PushFront(TEventsList& evList) {
- for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
- if (*rit) {
- Sent.push_front(*rit);
- }
- }
- }
-
- void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
- for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
- evList.insert(*it);
- }
-
- Scheduled.clear();
- }
-
- void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
- for (auto it = evList.begin(); it != evList.end(); ++it) {
- if (it->Event) {
- Scheduled.insert(*it);
- }
- }
-
- evList.clear();
- }
-
- bool TEventMailBox::IsActive(const TInstant& currentTime) const {
- return currentTime >= InactiveUntil;
- }
-
- void TEventMailBox::Freeze(const TInstant& deadline) {
- if (deadline > InactiveUntil)
- InactiveUntil = deadline;
- }
-
- TInstant TEventMailBox::GetInactiveUntil() const {
- return InactiveUntil;
- }
-
- void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
- Scheduled.insert(item);
- }
-
- bool TEventMailBox::IsScheduledEmpty() const {
- return Scheduled.empty();
- }
-
+ TrackSent.erase(result.Get());
+ ++ExpectedReceive;
+ }
+#endif
+ return result;
+ }
+
+ bool TEventMailBox::IsEmpty() const {
+ return Sent.empty();
+ }
+
+ void TEventMailBox::Capture(TEventsList& evList) {
+ evList.insert(evList.end(), Sent.begin(), Sent.end());
+ Sent.clear();
+ }
+
+ void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
+ Sent.push_front(ev);
+ }
+
+ void TEventMailBox::PushFront(TEventsList& evList) {
+ for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
+ if (*rit) {
+ Sent.push_front(*rit);
+ }
+ }
+ }
+
+ void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
+ for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
+ evList.insert(*it);
+ }
+
+ Scheduled.clear();
+ }
+
+ void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
+ for (auto it = evList.begin(); it != evList.end(); ++it) {
+ if (it->Event) {
+ Scheduled.insert(*it);
+ }
+ }
+
+ evList.clear();
+ }
+
+ bool TEventMailBox::IsActive(const TInstant& currentTime) const {
+ return currentTime >= InactiveUntil;
+ }
+
+ void TEventMailBox::Freeze(const TInstant& deadline) {
+ if (deadline > InactiveUntil)
+ InactiveUntil = deadline;
+ }
+
+ TInstant TEventMailBox::GetInactiveUntil() const {
+ return InactiveUntil;
+ }
+
+ void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
+ Scheduled.insert(item);
+ }
+
+ bool TEventMailBox::IsScheduledEmpty() const {
+ return Scheduled.empty();
+ }
+
TInstant TEventMailBox::GetFirstScheduleDeadline() const {
return Scheduled.begin()->Deadline;
}
@@ -219,80 +219,80 @@ namespace NActors {
}
class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
- public:
+ public:
TTimeProvider(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- {
- }
-
- TInstant Now() override {
- return Runtime.GetCurrentTime();
- }
-
- private:
+ : Runtime(runtime)
+ {
+ }
+
+ TInstant Now() override {
+ return Runtime.GetCurrentTime();
+ }
+
+ private:
TTestActorRuntimeBase& Runtime;
- };
-
+ };
+
class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
- public:
+ public:
TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
- : Runtime(runtime)
- , Node(node)
+ : Runtime(runtime)
+ , Node(node)
{
Y_UNUSED(Runtime);
}
-
+
void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
Y_UNUSED(actorSystem);
- Node->ActorSystemTimestamp = currentTimestamp;
+ Node->ActorSystemTimestamp = currentTimestamp;
Node->ActorSystemMonotonic = currentMonotonic;
- }
-
+ }
+
void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
Y_UNUSED(readers);
Y_UNUSED(scheduleReadersCount);
- }
-
+ }
+
void Start() override {
- }
-
+ }
+
void PrepareStop() override {
- }
-
+ }
+
void Stop() override {
- }
-
- private:
+ }
+
+ private:
TTestActorRuntimeBase* Runtime;
TTestActorRuntimeBase::TNodeDataBase* Node;
- };
-
+ };
+
class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
- public:
+ public:
TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
- : IExecutorPool(poolId)
- , Runtime(runtime)
- , NodeIndex(nodeIndex)
- , Node(node)
- {
- }
-
+ : IExecutorPool(poolId)
+ , Runtime(runtime)
+ , NodeIndex(nodeIndex)
+ , Node(node)
+ {
+ }
+
TTestActorRuntimeBase* GetRuntime() {
- return Runtime;
- }
-
- // for threads
+ return Runtime;
+ }
+
+ // for threads
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
Y_UNUSED(wctx);
Y_UNUSED(revolvingCounter);
Y_FAIL();
- }
-
+ }
+
void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
Y_UNUSED(workerId);
- Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
- }
-
+ Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
+ }
+
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
DoSchedule(deadline, ev, cookie, workerId);
}
@@ -309,16 +309,16 @@ namespace NActors {
void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
+ }
auto now = Runtime->GetTimeProvider()->Now();
if (deadline < now) {
@@ -327,36 +327,36 @@ namespace NActors {
TDuration delay = (deadline - now);
if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
Cerr << "Event was added to scheduled queue\n";
- } else {
+ } else {
if (cookie) {
cookie->Detach();
}
if (verbose) {
Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
}
- }
- }
-
- // for actorsystem
+ }
+ }
+
+ // for actorsystem
bool Send(TAutoPtr<IEventHandle>& ev) override {
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
-
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
@@ -364,7 +364,7 @@ namespace NActors {
return true;
}
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
@@ -384,68 +384,68 @@ namespace NActors {
Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
Runtime->MailboxesHasEvents.Signal();
}
- if (verbose)
+ if (verbose)
Cerr << "Event was added to sent queue\n";
- } else {
- if (verbose)
+ } else {
+ if (verbose)
Cerr << "Event was dropped\n";
- }
- return true;
- }
-
+ }
+ return true;
+ }
+
void ScheduleActivation(ui32 activation) override {
Y_UNUSED(activation);
- }
-
+ }
+
void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
Y_UNUSED(activation);
Y_UNUSED(revolvingCounter);
- }
-
+ }
+
TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
- }
-
+ return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
+ }
+
TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
- }
-
- // lifecycle stuff
+ return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
+ }
+
+ // lifecycle stuff
void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
Y_UNUSED(actorSystem);
Y_UNUSED(scheduleReaders);
Y_UNUSED(scheduleSz);
- }
-
+ }
+
void Start() override {
- }
-
+ }
+
void PrepareStop() override {
- }
-
+ }
+
void Shutdown() override {
- }
-
+ }
+
bool Cleanup() override {
return true;
}
- // generic
+ // generic
TAffinity* Affinity() const override {
Y_FAIL();
- }
-
- private:
+ }
+
+ private:
TTestActorRuntimeBase* const Runtime;
- const ui32 NodeIndex;
+ const ui32 NodeIndex;
TTestActorRuntimeBase::TNodeDataBase* const Node;
- };
-
+ };
+
IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
}
-
+
ui32 TTestActorRuntimeBase::NextNodeId = 1;
@@ -460,31 +460,31 @@ namespace NActors {
, ScheduledLimit(100000)
, MainThreadId(TThread::CurrentThreadId())
, ClusterUUID(MakeClusterId())
- , FirstNodeId(NextNodeId)
- , NodeCount(nodeCount)
+ , FirstNodeId(NextNodeId)
+ , NodeCount(nodeCount)
, DataCenterCount(dataCenterCount)
- , UseRealThreads(useRealThreads)
- , LocalId(0)
- , DispatchCyclesCount(0)
- , DispatchedEventsCount(0)
+ , UseRealThreads(useRealThreads)
+ , LocalId(0)
+ , DispatchCyclesCount(0)
+ , DispatchedEventsCount(0)
, NeedMonitoring(false)
, RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
, TimeProvider(new TTimeProvider(*this))
, ShouldContinue()
- , CurrentTimestamp(0)
+ , CurrentTimestamp(0)
, DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
, ReschedulingDelay(TDuration::MicroSeconds(0))
, ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
- , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
+ , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
, EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
, ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
, RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
, CurrentDispatchContext(nullptr)
- {
+ {
SetDispatcherRandomSeed(TInstant::Now(), 0);
EnableActorCallstack();
}
-
+
void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
@@ -492,10 +492,10 @@ namespace NActors {
node->LogSettings->SetAllowDrop(false);
node->LogSettings->SetThrottleDelay(TDuration::Zero());
node->DynamicCounters = new NMonitoring::TDynamicCounters;
-
+
InitNodeImpl(node, nodeIndex);
- }
-
+ }
+
void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
node->LogSettings->Append(
NActorsServices::EServiceCommon_MIN,
@@ -537,55 +537,55 @@ namespace NActors {
TTestActorRuntimeBase::~TTestActorRuntimeBase() {
CleanupNodes();
+ Cerr.Flush();
Cerr.Flush();
- Cerr.Flush();
- Clog.Flush();
-
+ Clog.Flush();
+
DisableActorCallstack();
- }
-
+ }
+
void TTestActorRuntimeBase::CleanupNodes() {
Nodes.clear();
}
bool TTestActorRuntimeBase::IsRealThreads() const {
- return UseRealThreads;
- }
-
+ return UseRealThreads;
+ }
+
TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
- return EEventAction::PROCESS;
- }
-
+ return EEventAction::PROCESS;
+ }
+
void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
Y_UNUSED(runtime);
Y_UNUSED(queue);
- scheduledEvents.clear();
- }
-
+ scheduledEvents.clear();
+ }
+
bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
- return false;
- }
-
+ return false;
+ }
+
bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
Y_UNUSED(runtime);
Y_UNUSED(delay);
Y_UNUSED(event);
Y_UNUSED(deadline);
- return true;
- }
-
+ return true;
+ }
+
void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
runtime.ScheduleWhiteList.insert(actorId);
runtime.ScheduleWhiteListParent[actorId] = parentId;
}
- }
-
+ }
+
class TScheduledTreeItem {
public:
TString Name;
@@ -635,11 +635,11 @@ namespace NActors {
};
void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
- if (scheduledEvents.empty())
- return;
-
- TInstant time = scheduledEvents.begin()->Deadline;
- while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
+ if (scheduledEvents.empty())
+ return;
+
+ TInstant time = scheduledEvents.begin()->Deadline;
+ while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
auto& item = *scheduledEvents.begin();
TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
@@ -669,58 +669,58 @@ namespace NActors {
ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
}
if (item.Cookie->Get()) {
- if (item.Cookie->Detach()) {
- queue.push_back(item.Event);
- }
- } else {
- queue.push_back(item.Event);
- }
-
- scheduledEvents.erase(scheduledEvents.begin());
- }
-
- runtime.UpdateCurrentTime(time);
- }
-
+ if (item.Cookie->Detach()) {
+ queue.push_back(item.Event);
+ }
+ } else {
+ queue.push_back(item.Event);
+ }
+
+ scheduledEvents.erase(scheduledEvents.begin());
+ }
+
+ runtime.UpdateCurrentTime(time);
+ }
+
TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ObserverFunc;
ObserverFunc = observerFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventsSelectorFunc;
ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = EventFilterFunc;
EventFilterFunc = filterFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventFilterFunc;
ScheduledEventFilterFunc = filterFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = RegistrationObserver;
RegistrationObserver = observerFunc;
return result;
- }
-
+ }
+
bool TTestActorRuntimeBase::IsVerbose() {
- return VERBOSE;
- }
-
+ return VERBOSE;
+ }
+
void TTestActorRuntimeBase::SetVerbose(bool verbose) {
VERBOSE = verbose;
}
@@ -728,16 +728,16 @@ namespace NActors {
void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
Y_VERIFY(!IsInitialized);
Y_VERIFY(nodeIndex < NodeCount);
- auto node = Nodes[nodeIndex + FirstNodeId];
- if (!node) {
+ auto node = Nodes[nodeIndex + FirstNodeId];
+ if (!node) {
node = GetNodeFactory().CreateNode();
- Nodes[nodeIndex + FirstNodeId] = node;
- }
-
- node->LocalServicesActors[actorId] = cmd.Actor;
+ Nodes[nodeIndex + FirstNodeId] = node;
+ }
+
+ node->LocalServicesActors[actorId] = cmd.Actor;
node->LocalServices.push_back(std::make_pair(actorId, cmd));
- }
-
+ }
+
void TTestActorRuntimeBase::InitNodes() {
NextNodeId += NodeCount;
Y_VERIFY(NodeCount > 0);
@@ -746,33 +746,33 @@ namespace NActors {
auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
TNodeDataBase* node = nodeIt->second.Get();
InitNode(node, nodeIndex);
- }
-
+ }
+
}
-
+
void TTestActorRuntimeBase::Initialize() {
InitNodes();
IsInitialized = true;
- }
-
+ }
+
void SetupCrossDC() {
}
TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldTimeout = DispatchTimeout;
- DispatchTimeout = timeout;
- return oldTimeout;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldTimeout = DispatchTimeout;
+ DispatchTimeout = timeout;
+ return oldTimeout;
+ }
+
TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldDelay = ReschedulingDelay;
- ReschedulingDelay = delay;
- return oldDelay;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldDelay = ReschedulingDelay;
+ ReschedulingDelay = delay;
+ return oldDelay;
+ }
+
void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
Y_VERIFY(!IsInitialized);
TGuard<TMutex> guard(Mutex);
@@ -780,68 +780,68 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
- TGuard<TMutex> guard(Mutex);
- for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
TString explanation;
auto status = node->LogSettings->SetLevel(priority, component, explanation);
if (status) {
Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
}
- }
- }
-
+ }
+ }
+
TInstant TTestActorRuntimeBase::GetCurrentTime() const {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(!UseRealThreads);
- return TInstant::MicroSeconds(CurrentTimestamp);
- }
-
+ return TInstant::MicroSeconds(CurrentTimestamp);
+ }
+
void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
static int counter = 0;
++counter;
if (VERBOSE) {
Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
}
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(!UseRealThreads);
- if (newTime.MicroSeconds() > CurrentTimestamp) {
- CurrentTimestamp = newTime.MicroSeconds();
- for (auto& kv : Nodes) {
+ if (newTime.MicroSeconds() > CurrentTimestamp) {
+ CurrentTimestamp = newTime.MicroSeconds();
+ for (auto& kv : Nodes) {
AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
- }
- }
- }
-
+ }
+ }
+ }
+
void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
UpdateCurrentTime(GetCurrentTime() + duration);
}
TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
Y_VERIFY(!UseRealThreads);
- return TimeProvider;
- }
-
+ return TimeProvider;
+ }
+
ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
Y_VERIFY(index < NodeCount);
- return FirstNodeId + index;
- }
-
+ return FirstNodeId + index;
+ }
+
ui32 TTestActorRuntimeBase::GetNodeCount() const {
- return NodeCount;
- }
-
+ return NodeCount;
+ }
+
ui64 TTestActorRuntimeBase::AllocateLocalId() {
- TGuard<TMutex> guard(Mutex);
- ui64 nextId = ++LocalId;
- if (VERBOSE) {
+ TGuard<TMutex> guard(Mutex);
+ ui64 nextId = ++LocalId;
+ if (VERBOSE) {
Cerr << "Allocated id: " << nextId << "\n";
- }
-
- return nextId;
- }
-
+ }
+
+ return nextId;
+ }
+
ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
if (UseRealThreads && NSan::TSanIsOn()) {
// Interconnect coroutines may move across threads
@@ -852,63 +852,63 @@ namespace NActors {
}
TString TTestActorRuntimeBase::GetTempDir() {
- if (!TmpDir)
- TmpDir.Reset(new TTempDir());
- return (*TmpDir)();
- }
-
+ if (!TmpDir)
+ TmpDir.Reset(new TTempDir());
+ return (*TmpDir)();
+ }
+
TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
ui64 revolvingCounter, const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
+ if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
- }
-
- // first step - find good enough mailbox
- ui32 hint = 0;
+ return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
+ }
+
+ // first step - find good enough mailbox
+ ui32 hint = 0;
TMailboxHeader *mailbox = nullptr;
-
- {
- ui32 hintBackoff = 0;
-
- while (hint == 0) {
- hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
- mailbox = node->MailboxTable->Get(hint);
-
- if (!mailbox->LockFromFree()) {
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- hintBackoff = hint;
- hint = 0;
- }
- }
-
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- }
-
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
+
+ {
+ ui32 hintBackoff = 0;
+
+ while (hint == 0) {
+ hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
+ mailbox = node->MailboxTable->Get(hint);
+
+ if (!mailbox->LockFromFree()) {
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ hintBackoff = hint;
+ hint = 0;
+ }
+ }
+
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
- }
-
- // ok, got mailbox
- mailbox->AttachActor(localActorId, actor);
-
- // do init
+ }
+
+ // ok, got mailbox
+ mailbox->AttachActor(localActorId, actor);
+
+ // do init
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
-
- switch (mailboxType) {
- case TMailboxType::Simple:
+
+ switch (mailboxType) {
+ case TMailboxType::Simple:
UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::Revolving:
+ break;
+ case TMailboxType::Revolving:
UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
+ break;
case TMailboxType::HTSwap:
UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
break;
@@ -918,116 +918,116 @@ namespace NActors {
case TMailboxType::TinyReadAsFilled:
UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
break;
- default:
+ default:
Y_FAIL("Unsupported mailbox type");
- }
-
- return actorId;
- }
-
+ }
+
+ return actorId;
+ }
+
TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
+ if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
- }
-
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
+ return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
- }
-
- mailbox->AttachActor(localActorId, actor);
+ }
+
+ mailbox->AttachActor(localActorId, actor);
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
-
- return actorId;
- }
-
+
+ return actorId;
+ }
+
TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (!UseRealThreads) {
- IActor* actor = FindActor(actorId, node);
- node->LocalServicesActors[serviceId] = actor;
- node->ActorToActorId[actor] = actorId;
- }
-
+ if (!UseRealThreads) {
+ IActor* actor = FindActor(actorId, node);
+ node->LocalServicesActors[serviceId] = actor;
+ node->ActorToActorId[actor] = actorId;
+ }
+
return node->ActorSystem->RegisterLocalService(serviceId, actorId);
- }
-
+ }
+
TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
- EdgeActors.insert(edgeActor);
+ EdgeActors.insert(edgeActor);
EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
- return edgeActor;
- }
-
+ return edgeActor;
+ }
+
TEventsList TTestActorRuntimeBase::CaptureEvents() {
- TGuard<TMutex> guard(Mutex);
- TEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->Capture(result);
- }
-
- return result;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->Capture(result);
+ }
+
+ return result;
+ }
+
TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).Capture(result);
- return result;
- }
-
+ TEventsList result;
+ GetMailbox(nodeId, hint).Capture(result);
+ return result;
+ }
+
void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
- TGuard<TMutex> guard(Mutex);
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ TGuard<TMutex> guard(Mutex);
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
-
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+
void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
- for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
- if (*rit) {
- auto& ev = *rit;
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ TGuard<TMutex> guard(Mutex);
+ for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
+ if (*rit) {
+ auto& ev = *rit;
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
- }
-
- events.clear();
- }
-
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+ }
+
+ events.clear();
+ }
+
void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).PushFront(events);
- events.clear();
- }
-
+ TEventsList result;
+ GetMailbox(nodeId, hint).PushFront(events);
+ events.clear();
+ }
+
TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
- TGuard<TMutex> guard(Mutex);
- TScheduledEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->CaptureScheduled(result);
- }
-
- return result;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TScheduledEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->CaptureScheduled(result);
+ }
+
+ return result;
+ }
+
bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
return DispatchEvents(options, TInstant::Max());
}
@@ -1037,100 +1037,100 @@ namespace NActors {
}
bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
return DispatchEventsInternal(options, simDeadline);
}
// Mutex must be locked by caller!
bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
- TDispatchContext localContext;
- localContext.Options = &options;
- localContext.PrevContext = nullptr;
- bool verbose = !options.Quiet && VERBOSE;
-
- struct TDispatchContextSetter {
+ TDispatchContext localContext;
+ localContext.Options = &options;
+ localContext.PrevContext = nullptr;
+ bool verbose = !options.Quiet && VERBOSE;
+
+ struct TDispatchContextSetter {
TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
: Runtime(runtime)
- {
+ {
lastContext.PrevContext = Runtime.CurrentDispatchContext;
Runtime.CurrentDispatchContext = &lastContext;
- }
-
- ~TDispatchContextSetter() {
+ }
+
+ ~TDispatchContextSetter() {
Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
- }
-
+ }
+
TTestActorRuntimeBase& Runtime;
} DispatchContextSetter(*this, localContext);
TInstant dispatchTime = TInstant::MicroSeconds(0);
- TInstant deadline = dispatchTime + DispatchTimeout;
- const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
- TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- if (verbose) {
+ TInstant deadline = dispatchTime + DispatchTimeout;
+ const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
+ TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ if (verbose) {
Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
- }
-
- struct TTempEdgeEventsCaptor {
+ }
+
+ struct TTempEdgeEventsCaptor {
TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- , HasEvents(false)
- {
- for (auto edgeActor : Runtime.EdgeActors) {
- TEventsList events;
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
+ : Runtime(runtime)
+ , HasEvents(false)
+ {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ TEventsList events;
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
Y_VERIFY(storeIt == Store.end());
storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
- storeIt->second->PushFront(events);
- if (!events.empty())
- HasEvents = true;
- }
- }
-
- ~TTempEdgeEventsCaptor() {
- for (auto edgeActor : Runtime.EdgeActors) {
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
- if (storeIt == Store.end()) {
- continue;
- }
-
- TEventsList events;
- storeIt->second->Capture(events);
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
- }
- }
-
+ storeIt->second->PushFront(events);
+ if (!events.empty())
+ HasEvents = true;
+ }
+ }
+
+ ~TTempEdgeEventsCaptor() {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
+ if (storeIt == Store.end()) {
+ continue;
+ }
+
+ TEventsList events;
+ storeIt->second->Capture(events);
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
+ }
+ }
+
TTestActorRuntimeBase& Runtime;
- TEventMailBoxList Store;
- bool HasEvents;
- };
-
- TEventMailBoxList restrictedMailboxes;
- const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
- for (auto mailboxId : options.OnlyMailboxes) {
- auto it = Mailboxes.find(mailboxId);
- if (it == Mailboxes.end()) {
+ TEventMailBoxList Store;
+ bool HasEvents;
+ };
+
+ TEventMailBoxList restrictedMailboxes;
+ const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
+ for (auto mailboxId : options.OnlyMailboxes) {
+ auto it = Mailboxes.find(mailboxId);
+ if (it == Mailboxes.end()) {
it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
- }
-
+ }
+
restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
- }
-
- TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
- if (!restrictedMailboxes) {
- tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
- }
-
- TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
+ }
+
+ TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
+ if (!restrictedMailboxes) {
+ tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
+ }
+
+ TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
while (!currentMailboxes.empty()) {
- bool hasProgress = true;
- while (hasProgress) {
- ++DispatchCyclesCount;
- hasProgress = false;
-
+ bool hasProgress = true;
+ while (hasProgress) {
+ ++DispatchCyclesCount;
+ hasProgress = false;
+
ui64 eventsToDispatch = 0;
for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
@@ -1143,7 +1143,7 @@ namespace NActors {
bool isEmpty = false;
while (!isEmpty && eventsDispatched < eventsToDispatch) {
ui64 mailboxCount = currentMailboxes.size();
- ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
+ ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
auto startWithMboxIt = currentMailboxes.begin();
for (ui64 i = 0; i < startWith; ++i) {
++startWithMboxIt;
@@ -1170,7 +1170,7 @@ namespace NActors {
ObserverFunc(*this, ev);
}
mbox.second->PushFront(events);
- }
+ }
if (!isEdgeMailbox) {
isEmpty = false;
@@ -1216,8 +1216,8 @@ namespace NActors {
Y_FAIL("Unknown action");
}
}
- }
-
+ }
+
}
Y_VERIFY(mboxIt != currentMailboxes.end());
if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
@@ -1225,82 +1225,82 @@ namespace NActors {
mboxIt->second->IsScheduledEmpty() &&
mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
suspectedBoxes.push_back(mboxIt->first);
- }
+ }
++mboxIt;
if (mboxIt == currentMailboxes.end()) {
mboxIt = currentMailboxes.begin();
- }
+ }
Y_VERIFY(endWithMboxIt != currentMailboxes.end());
if (mboxIt == endWithMboxIt) {
- break;
- }
- }
-
+ break;
+ }
+ }
+
for (auto id : suspectedBoxes) {
auto it = currentMailboxes.find(id);
if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
currentMailboxes.erase(it);
}
- }
- }
- }
-
+ }
+ }
+ }
+
if (localContext.FinalEventFound) {
return true;
- }
-
- if (!localContext.FoundNonEmptyMailboxes.empty())
+ }
+
+ if (!localContext.FoundNonEmptyMailboxes.empty())
return true;
-
+
if (options.CustomFinalCondition && options.CustomFinalCondition())
return true;
- if (options.FinalEvents.empty()) {
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
- continue;
-
- if (!mbox.second->IsEmpty()) {
- if (verbose) {
+ if (options.FinalEvents.empty()) {
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
+ continue;
+
+ if (!mbox.second->IsEmpty()) {
+ if (verbose) {
Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
return true;
- }
- }
- }
-
+ }
+ }
+ }
+
if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
return false;
}
- if (dispatchTime >= deadline) {
- if (verbose) {
+ if (dispatchTime >= deadline) {
+ if (verbose) {
Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
ythrow TWithBackTrace<TEmptyEventQueueException>();
- }
-
+ }
+
if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
- inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- bool isEmpty = true;
- TMaybe<TInstant> nearestMailboxDeadline;
+ inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ bool isEmpty = true;
+ TMaybe<TInstant> nearestMailboxDeadline;
TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
TMaybe<TInstant> nextScheduleDeadline;
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
- nearestMailboxDeadline = mbox.second->GetInactiveUntil();
- }
-
- continue;
- }
-
- if (mbox.second->IsScheduledEmpty())
- continue;
-
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
+ nearestMailboxDeadline = mbox.second->GetInactiveUntil();
+ }
+
+ continue;
+ }
+
+ if (mbox.second->IsScheduledEmpty())
+ continue;
+
auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
nextScheduleMboxes.clear();
@@ -1312,118 +1312,118 @@ namespace NActors {
}
for (const auto& nextScheduleMbox : nextScheduleMboxes) {
- TEventsList selectedEvents;
- TScheduledEventsList capturedScheduledEvents;
+ TEventsList selectedEvents;
+ TScheduledEventsList capturedScheduledEvents;
nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
- ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
+ ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
nextScheduleMbox->PushScheduled(capturedScheduledEvents);
- for (auto& event : selectedEvents) {
- if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
+ for (auto& event : selectedEvents) {
+ if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
PrintEvent(event, this);
- }
-
+ }
+
nextScheduleMbox->Send(event);
- isEmpty = false;
- }
- }
-
- if (!isEmpty) {
- if (verbose) {
+ isEmpty = false;
+ }
+ }
+
+ if (!isEmpty) {
+ if (verbose) {
Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
deadline = dispatchTime + DispatchTimeout;
- continue;
- }
-
- if (nearestMailboxDeadline.Defined()) {
- if (verbose) {
+ continue;
+ }
+
+ if (nearestMailboxDeadline.Defined()) {
+ if (verbose) {
Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
- }
-
- UpdateCurrentTime(*nearestMailboxDeadline.Get());
- continue;
- }
- }
-
- TDuration waitDelay = TDuration::MilliSeconds(10);
- dispatchTime += waitDelay;
- MailboxesHasEvents.WaitT(Mutex, waitDelay);
- }
+ }
+
+ UpdateCurrentTime(*nearestMailboxDeadline.Get());
+ continue;
+ }
+ }
+
+ TDuration waitDelay = TDuration::MilliSeconds(10);
+ dispatchTime += waitDelay;
+ MailboxesHasEvents.WaitT(Mutex, waitDelay);
+ }
return false;
- }
-
+ }
+
void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
- const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
- if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
- context->FoundNonEmptyMailboxes.insert(mboxId);
- }
-
- context = context->PrevContext;
- }
- }
-
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
+ const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
+ if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
+ context->FoundNonEmptyMailboxes.insert(mboxId);
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
for (const auto& finalEvent : context->Options->FinalEvents) {
if (finalEvent.EventCheck(ev)) {
auto& freq = context->FinalEventFrequency[&finalEvent];
if (++freq >= finalEvent.RequiredCount) {
context->FinalEventFound = true;
}
- }
- }
-
- context = context->PrevContext;
- }
- }
-
+ }
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
senderNodeIndex, NodeCount);
- SendInternal(ev, senderNodeIndex, viaActorSystem);
- }
-
+ SendInternal(ev, senderNodeIndex, viaActorSystem);
+ }
+
void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
- GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
- if (VERBOSE)
+ ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
+ GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
+ if (VERBOSE)
Cerr << "Event was added to scheduled queue\n";
- }
-
+ }
+
void TTestActorRuntimeBase::ClearCounters() {
- TGuard<TMutex> guard(Mutex);
- EvCounters.clear();
- }
-
+ TGuard<TMutex> guard(Mutex);
+ EvCounters.clear();
+ }
+
ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
- TGuard<TMutex> guard(Mutex);
- auto it = EvCounters.find(evType);
- if (it == EvCounters.end())
- return 0;
-
- return it->second;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ auto it = EvCounters.find(evType);
+ if (it == EvCounters.end())
+ return 0;
+
+ return it->second;
+ }
+
TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- return node->ActorSystem->LookupLocalService(serviceId);
- }
-
+ return node->ActorSystem->LookupLocalService(serviceId);
+ }
+
void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
- TGuard<TMutex> guard(Mutex);
- ui32 dispatchCount = 0;
+ TGuard<TMutex> guard(Mutex);
+ ui32 dispatchCount = 0;
if (!edgeFilter.empty()) {
for (auto edgeActor : edgeFilter) {
Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
@@ -1431,202 +1431,202 @@ namespace NActors {
}
const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
- for (;;) {
+ for (;;) {
for (auto edgeActor : edgeActors) {
- TEventsList events;
- auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
- bool foundEvent = false;
- mbox.Capture(events);
- for (auto& ev : events) {
- if (filter(*this, ev)) {
- foundEvent = true;
- break;
- }
- }
-
- mbox.PushFront(events);
- if (foundEvent)
- return;
- }
-
- ++dispatchCount;
- {
+ TEventsList events;
+ auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
+ bool foundEvent = false;
+ mbox.Capture(events);
+ for (auto& ev : events) {
+ if (filter(*this, ev)) {
+ foundEvent = true;
+ break;
+ }
+ }
+
+ mbox.PushFront(events);
+ if (foundEvent)
+ return;
+ }
+
+ ++dispatchCount;
+ {
if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
return; // Timed out; event was not found
}
- }
-
+ }
+
Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
- }
- }
-
+ }
+ }
+
TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndexFrom < NodeCount);
Y_VERIFY(nodeIndexTo < NodeCount);
Y_VERIFY(nodeIndexFrom != nodeIndexTo);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
- return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
- }
-
+ return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
+ }
+
void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
- TGuard<TMutex> guard(Mutex);
- BlockedOutput.insert(actorId);
- }
-
+ TGuard<TMutex> guard(Mutex);
+ BlockedOutput.insert(actorId);
+ }
+
void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
ui64 days = (time.Hours() / 24);
DispatcherRandomSeed = (days << 32) ^ iteration;
- DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
+ DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
}
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
- TGuard<TMutex> guard(Mutex);
- if (nodeIndex == Max<ui32>()) {
+ TGuard<TMutex> guard(Mutex);
+ if (nodeIndex == Max<ui32>()) {
Y_VERIFY(actorId.NodeId());
- nodeIndex = actorId.NodeId() - FirstNodeId;
- }
-
+ nodeIndex = actorId.NodeId() - FirstNodeId;
+ }
+
Y_VERIFY(nodeIndex < NodeCount);
- auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
+ auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
Y_VERIFY(nodeIt != Nodes.end());
TNodeDataBase* node = nodeIt->second.Get();
- return FindActor(actorId, node);
- }
-
+ return FindActor(actorId, node);
+ }
+
void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
- TGuard<TMutex> guard(Mutex);
- if (allow) {
+ TGuard<TMutex> guard(Mutex);
+ if (allow) {
if (VERBOSE) {
Cerr << "Actor " << actorId << " added to schedule whitelist";
}
- ScheduleWhiteList.insert(actorId);
- } else {
+ ScheduleWhiteList.insert(actorId);
+ } else {
if (VERBOSE) {
Cerr << "Actor " << actorId << " removed from schedule whitelist";
}
- ScheduleWhiteList.erase(actorId);
- }
- }
-
+ ScheduleWhiteList.erase(actorId);
+ }
+ }
+
bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
- TGuard<TMutex> guard(Mutex);
- return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
- }
-
+ TGuard<TMutex> guard(Mutex);
+ return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
+ }
+
TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 nodeId = FirstNodeId + nodeIndex;
TNodeDataBase* node = Nodes[nodeId].Get();
- return node->DynamicCounters;
- }
-
+ return node->DynamicCounters;
+ }
+
void TTestActorRuntimeBase::SetupMonitoring() {
NeedMonitoring = true;
- }
-
+ }
+
void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 nodeId = FirstNodeId + nodeIndex;
TNodeDataBase* node = Nodes[nodeId].Get();
- ui32 targetNode = ev->GetRecipientRewrite().NodeId();
- ui32 targetNodeIndex;
- if (targetNode == 0) {
- targetNodeIndex = nodeIndex;
- } else {
- targetNodeIndex = targetNode - FirstNodeId;
+ ui32 targetNode = ev->GetRecipientRewrite().NodeId();
+ ui32 targetNodeIndex;
+ if (targetNode == 0) {
+ targetNodeIndex = nodeIndex;
+ } else {
+ targetNodeIndex = targetNode - FirstNodeId;
Y_VERIFY(targetNodeIndex < NodeCount);
- }
-
- if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
- node->ActorSystem->Send(ev);
- return;
- }
-
+ }
+
+ if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
+ node->ActorSystem->Send(ev);
+ return;
+ }
+
Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
- TAutoPtr<IEventHandle> evHolder(ev);
-
+ TAutoPtr<IEventHandle> evHolder(ev);
+
if (!AllowSendFrom(node, evHolder)) {
return;
}
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
- if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- mbox.PushFront(evHolder);
- return;
- }
-
- ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
- if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
+ if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ mbox.PushFront(evHolder);
+ return;
+ }
+
+ ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
+ if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
Cerr << "Send event, ";
PrintEvent(evHolder, this);
- }
-
- EvCounters[ev->GetTypeRewrite()]++;
-
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(recipientLocalId);
- if (recipientActor) {
+ }
+
+ EvCounters[ev->GetTypeRewrite()]++;
+
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(recipientLocalId);
+ if (recipientActor) {
// Save actorId by value in order to prevent ctx from being invalidated during another Send call.
TActorId actorId = ev->GetRecipientRewrite();
- node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
+ node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
- CurrentRecipient = actorId;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
+ CurrentRecipient = actorId;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
#ifdef USE_ACTOR_CALLSTACK
TCallstack::GetTlsCallstack() = ev->Callstack;
TCallstack::GetTlsCallstack().SetLinesToSkip();
#endif
- recipientActor->Receive(evHolder, ctx);
+ recipientActor->Receive(evHolder, ctx);
node->ExecutorThread->DropUnregistered();
- }
+ }
CurrentRecipient = TActorId();
TlsActivationContext = prevTlsActivationContext;
- } else {
- if (VERBOSE) {
+ } else {
+ if (VERBOSE) {
Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
- }
-
- auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
- if (!!forwardedEv) {
- node->ActorSystem->Send(forwardedEv);
- }
- }
- }
-
+ }
+
+ auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
+ if (!!forwardedEv) {
+ node->ActorSystem->Send(forwardedEv);
+ }
+ }
+ }
+
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
- ui32 mailboxHint = actorId.Hint();
- ui64 localId = actorId.LocalId();
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* actor = mailbox->FindActor(localId);
- return actor;
- }
-
+ ui32 mailboxHint = actorId.Hint();
+ ui64 localId = actorId.LocalId();
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* actor = mailbox->FindActor(localId);
+ return actor;
+ }
+
THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
- THolder<TActorSystemSetup> setup(new TActorSystemSetup);
- setup->NodeId = FirstNodeId + nodeIndex;
+ THolder<TActorSystemSetup> setup(new TActorSystemSetup);
+ setup->NodeId = FirstNodeId + nodeIndex;
- if (UseRealThreads) {
+ if (UseRealThreads) {
setup->ExecutorsCount = 5;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
- setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
- setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
- setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
- setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
+ setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
+ setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
+ setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
+ setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
- } else {
- setup->ExecutorsCount = 1;
- setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
- setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
- }
-
+ } else {
+ setup->ExecutorsCount = 1;
+ setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
+ setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
+ }
+
InitActorSystemSetup(*setup);
return setup;
@@ -1635,15 +1635,15 @@ namespace NActors {
THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
auto setup = MakeActorSystemSetup(nodeIndex, node);
- node->ExecutorPools.resize(setup->ExecutorsCount);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- node->ExecutorPools[i] = setup->Executors[i].Get();
- }
-
+ node->ExecutorPools.resize(setup->ExecutorsCount);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ node->ExecutorPools[i] = setup->Executors[i].Get();
+ }
+
const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
- setup->LocalServices = node->LocalServices;
- setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
+ setup->LocalServices = node->LocalServices;
+ setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
const TActorId nameserviceId = GetNameserviceActorId();
TIntrusivePtr<TInterconnectProxyCommon> common;
@@ -1663,10 +1663,10 @@ namespace NActors {
common->ClusterUUID = ClusterUUID;
common->AcceptUUID = {ClusterUUID};
- for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
- if (proxyNodeIndex == nodeIndex)
- continue;
-
+ for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
+ if (proxyNodeIndex == nodeIndex)
+ continue;
+
const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
IActor *proxyActor = UseRealInterconnect
@@ -1674,8 +1674,8 @@ namespace NActors {
: InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
- }
-
+ }
+
setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
if (UseRealInterconnect) {
@@ -1691,10 +1691,10 @@ namespace NActors {
std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
setup->LocalServices.push_back(loggerActorPair);
}
-
+
return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
- }
-
+ }
+
TActorSystem* TTestActorRuntimeBase::SingleSys() const {
Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
@@ -1716,16 +1716,16 @@ namespace NActors {
TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
- TGuard<TMutex> guard(Mutex);
- auto mboxId = TEventMailboxId(nodeId, hint);
- auto it = Mailboxes.find(mboxId);
- if (it == Mailboxes.end()) {
+ TGuard<TMutex> guard(Mutex);
+ auto mboxId = TEventMailboxId(nodeId, hint);
+ auto it = Mailboxes.find(mboxId);
+ if (it == Mailboxes.end()) {
it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
- }
-
- return *it->second;
- }
-
+ }
+
+ return *it->second;
+ }
+
void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
TGuard<TMutex> guard(Mutex);
auto mboxId = TEventMailboxId(nodeId, hint);
@@ -1739,36 +1739,36 @@ namespace NActors {
return actorId.ToString();
}
- struct TStrandingActorDecoratorContext : public TThrRefBase {
- TStrandingActorDecoratorContext()
+ struct TStrandingActorDecoratorContext : public TThrRefBase {
+ TStrandingActorDecoratorContext()
: Queue(new TQueueType)
- {
- }
-
+ {
+ }
+
typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
- };
-
- class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
- public:
- class TReplyActor : public TActor<TReplyActor> {
- public:
+ };
+
+ class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
+ public:
+ class TReplyActor : public TActor<TReplyActor> {
+ public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
- TReplyActor(TStrandingActorDecorator* owner)
- : TActor(&TReplyActor::StateFunc)
- , Owner(owner)
- {
- }
-
- STFUNC(StateFunc);
-
- private:
- TStrandingActorDecorator* const Owner;
- };
-
+ TReplyActor(TStrandingActorDecorator* owner)
+ : TActor(&TReplyActor::StateFunc)
+ , Owner(owner)
+ {
+ }
+
+ STFUNC(StateFunc);
+
+ private:
+ TStrandingActorDecorator* const Owner;
+ };
+
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
@@ -1776,41 +1776,41 @@ namespace NActors {
TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
- : Delegatee(delegatee)
- , IsSync(isSync)
- , AdditionalActors(additionalActors)
- , Context(context)
- , HasReply(false)
+ : Delegatee(delegatee)
+ , IsSync(isSync)
+ , AdditionalActors(additionalActors)
+ , Context(context)
+ , HasReply(false)
, Runtime(runtime)
, ReplyChecker(createReplyChecker())
- {
- if (IsSync) {
+ {
+ if (IsSync) {
Y_VERIFY(!runtime->IsRealThreads());
- }
- }
-
- void Bootstrap(const TActorContext& ctx) {
- Become(&TStrandingActorDecorator::StateFunc);
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TStrandingActorDecorator::StateFunc);
ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
- for (const auto& actor : AdditionalActors) {
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
- }
-
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.Quiet = true;
- }
-
- STFUNC(StateFunc) {
- bool wasEmpty = !Context->Queue->Head();
- Context->Queue->Push(ev.Release());
- if (wasEmpty) {
- SendHead(ctx);
- }
- }
-
- STFUNC(Reply) {
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
+ for (const auto& actor : AdditionalActors) {
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
+ }
+
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.Quiet = true;
+ }
+
+ STFUNC(StateFunc) {
+ bool wasEmpty = !Context->Queue->Head();
+ Context->Queue->Push(ev.Release());
+ if (wasEmpty) {
+ SendHead(ctx);
+ }
+ }
+
+ STFUNC(Reply) {
Y_VERIFY(!HasReply);
IEventHandle *requestEv = Context->Queue->Head();
TActorId originalSender = requestEv->Sender;
@@ -1818,85 +1818,85 @@ namespace NActors {
if (HasReply) {
delete Context->Queue->Pop();
}
- ctx.ExecutorThread.Send(ev->Forward(originalSender));
- if (!IsSync && Context->Queue->Head()) {
- SendHead(ctx);
- }
- }
-
- private:
- void SendHead(const TActorContext& ctx) {
- if (!IsSync) {
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
- } else {
- while (Context->Queue->Head()) {
- HasReply = false;
+ ctx.ExecutorThread.Send(ev->Forward(originalSender));
+ if (!IsSync && Context->Queue->Head()) {
+ SendHead(ctx);
+ }
+ }
+
+ private:
+ void SendHead(const TActorContext& ctx) {
+ if (!IsSync) {
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ } else {
+ while (Context->Queue->Head()) {
+ HasReply = false;
ctx.ExecutorThread.Send(GetForwardedEvent().Release());
int count = 100;
while (!HasReply && count > 0) {
- try {
+ try {
Runtime->DispatchEvents(DelegateeOptions);
- } catch (TEmptyEventQueueException&) {
+ } catch (TEmptyEventQueueException&) {
count--;
- Cerr << "No reply" << Endl;
- }
- }
-
+ Cerr << "No reply" << Endl;
+ }
+ }
+
Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
- }
- }
- }
-
- TAutoPtr<IEventHandle> GetForwardedEvent() {
- IEventHandle* ev = Context->Queue->Head();
+ }
+ }
+ }
+
+ TAutoPtr<IEventHandle> GetForwardedEvent() {
+ IEventHandle* ev = Context->Queue->Head();
ReplyChecker->OnRequest(ev);
- TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
- ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
+ TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
+ ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
: new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
-
- return forwardedEv;
- }
- private:
+
+ return forwardedEv;
+ }
+ private:
const TActorId Delegatee;
- const bool IsSync;
+ const bool IsSync;
const TVector<TActorId> AdditionalActors;
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
TActorId ReplyId;
- bool HasReply;
- TDispatchOptions DelegateeOptions;
+ bool HasReply;
+ TDispatchOptions DelegateeOptions;
TTestActorRuntimeBase* Runtime;
THolder<IReplyChecker> ReplyChecker;
- };
-
- void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
- Owner->Reply(ev, ctx);
- }
-
- class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
- public:
+ };
+
+ void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
+ Owner->Reply(ev, ctx);
+ }
+
+ class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
+ public:
TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
- : Context(new TStrandingActorDecoratorContext())
+ : Context(new TStrandingActorDecoratorContext())
, Runtime(runtime)
, CreateReplyChecker(createReplyChecker)
- {
- }
-
+ {
+ }
+
IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
CreateReplyChecker);
- }
-
- private:
+ }
+
+ private:
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
TTestActorRuntimeBase* Runtime;
TReplyCheckerCreator CreateReplyChecker;
- };
-
+ };
+
TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker) {
return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
- }
-
- ui64 DefaultRandomSeed = 9999;
-}
+ }
+
+ ui64 DefaultRandomSeed = 9999;
+}
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 26e3b45c98..06eadb15de 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actorsystem.h>
@@ -9,27 +9,27 @@
#include <library/cpp/actors/util/should_continue.h>
#include <library/cpp/actors/interconnect/poller_tcp.h>
#include <library/cpp/actors/interconnect/mock/ic_mock.h>
-#include <library/cpp/random_provider/random_provider.h>
-#include <library/cpp/time_provider/time_provider.h>
+#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/time_provider/time_provider.h>
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/datetime/base.h>
-#include <util/folder/tempdir.h>
-#include <util/generic/deque.h>
+
+#include <util/datetime/base.h>
+#include <util/folder/tempdir.h>
+#include <util/generic/deque.h>
#include <util/generic/hash.h>
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
-#include <util/generic/queue.h>
-#include <util/generic/set.h>
-#include <util/generic/vector.h>
-#include <util/system/defaults.h>
-#include <util/system/mutex.h>
-#include <util/system/condvar.h>
-#include <util/system/thread.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/queue.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+#include <util/system/defaults.h>
+#include <util/system/mutex.h>
+#include <util/system/condvar.h>
+#include <util/system/thread.h>
#include <util/system/sanitizers.h>
#include <util/system/valgrind.h>
#include <utility>
-
+
#include <functional>
const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer(
@@ -38,141 +38,141 @@ const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer(
);
-namespace NActors {
+namespace NActors {
struct THeSingleSystemEnv { };
- struct TEventMailboxId {
- TEventMailboxId()
- : NodeId(0)
- , Hint(0)
- {
- }
-
- TEventMailboxId(ui32 nodeId, ui32 hint)
- : NodeId(nodeId)
- , Hint(hint)
- {
- }
-
- bool operator<(const TEventMailboxId& other) const {
- return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint);
- }
-
- bool operator==(const TEventMailboxId& other) const {
- return (NodeId == other.NodeId) && (Hint == other.Hint);
- }
-
+ struct TEventMailboxId {
+ TEventMailboxId()
+ : NodeId(0)
+ , Hint(0)
+ {
+ }
+
+ TEventMailboxId(ui32 nodeId, ui32 hint)
+ : NodeId(nodeId)
+ , Hint(hint)
+ {
+ }
+
+ bool operator<(const TEventMailboxId& other) const {
+ return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint);
+ }
+
+ bool operator==(const TEventMailboxId& other) const {
+ return (NodeId == other.NodeId) && (Hint == other.Hint);
+ }
+
struct THash {
ui64 operator()(const TEventMailboxId& mboxId) const noexcept {
return mboxId.NodeId * 31ULL + mboxId.Hint;
}
};
- ui32 NodeId;
- ui32 Hint;
- };
-
- struct TDispatchOptions {
- struct TFinalEventCondition {
+ ui32 NodeId;
+ ui32 Hint;
+ };
+
+ struct TDispatchOptions {
+ struct TFinalEventCondition {
std::function<bool(IEventHandle& ev)> EventCheck;
- ui32 RequiredCount;
-
- TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1)
+ ui32 RequiredCount;
+
+ TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1)
: EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; })
- , RequiredCount(requiredCount)
- {
- }
+ , RequiredCount(requiredCount)
+ {
+ }
TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1)
: EventCheck(eventCheck)
, RequiredCount(requiredCount)
{
}
- };
-
+ };
+
TVector<TFinalEventCondition> FinalEvents;
TVector<TEventMailboxId> NonEmptyMailboxes;
TVector<TEventMailboxId> OnlyMailboxes;
std::function<bool()> CustomFinalCondition;
bool Quiet = false;
- };
-
- struct TScheduledEventQueueItem {
- TInstant Deadline;
- TAutoPtr<IEventHandle> Event;
+ };
+
+ struct TScheduledEventQueueItem {
+ TInstant Deadline;
+ TAutoPtr<IEventHandle> Event;
TAutoPtr<TSchedulerCookieHolder> Cookie;
- ui64 UniqueId;
-
- TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie)
- : Deadline(deadline)
- , Event(event)
+ ui64 UniqueId;
+
+ TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie)
+ : Deadline(deadline)
+ , Event(event)
, Cookie(new TSchedulerCookieHolder(cookie))
- , UniqueId(++NextUniqueId)
- {}
-
- bool operator<(const TScheduledEventQueueItem& other) const {
- if (Deadline < other.Deadline)
- return true;
-
- if (Deadline > other.Deadline)
- return false;
-
- return UniqueId < other.UniqueId;
- }
-
- static ui64 NextUniqueId;
- };
-
+ , UniqueId(++NextUniqueId)
+ {}
+
+ bool operator<(const TScheduledEventQueueItem& other) const {
+ if (Deadline < other.Deadline)
+ return true;
+
+ if (Deadline > other.Deadline)
+ return false;
+
+ return UniqueId < other.UniqueId;
+ }
+
+ static ui64 NextUniqueId;
+ };
+
typedef TDeque<TAutoPtr<IEventHandle>> TEventsList;
typedef TSet<TScheduledEventQueueItem> TScheduledEventsList;
-
- class TEventMailBox : public TThrRefBase {
- public:
- TEventMailBox()
+
+ class TEventMailBox : public TThrRefBase {
+ public:
+ TEventMailBox()
: InactiveUntil(TInstant::MicroSeconds(0))
-#ifdef DEBUG_ORDER_EVENTS
- , ExpectedReceive(0)
- , NextToSend(0)
-#endif
- {
- }
-
- void Send(TAutoPtr<IEventHandle> ev);
- bool IsEmpty() const;
- TAutoPtr<IEventHandle> Pop();
- void Capture(TEventsList& evList);
- void PushFront(TAutoPtr<IEventHandle>& ev);
- void PushFront(TEventsList& evList);
- void CaptureScheduled(TScheduledEventsList& evList);
- void PushScheduled(TScheduledEventsList& evList);
- bool IsActive(const TInstant& currentTime) const;
- void Freeze(const TInstant& deadline);
- TInstant GetInactiveUntil() const;
- void Schedule(const TScheduledEventQueueItem& item);
- bool IsScheduledEmpty() const;
+#ifdef DEBUG_ORDER_EVENTS
+ , ExpectedReceive(0)
+ , NextToSend(0)
+#endif
+ {
+ }
+
+ void Send(TAutoPtr<IEventHandle> ev);
+ bool IsEmpty() const;
+ TAutoPtr<IEventHandle> Pop();
+ void Capture(TEventsList& evList);
+ void PushFront(TAutoPtr<IEventHandle>& ev);
+ void PushFront(TEventsList& evList);
+ void CaptureScheduled(TScheduledEventsList& evList);
+ void PushScheduled(TScheduledEventsList& evList);
+ bool IsActive(const TInstant& currentTime) const;
+ void Freeze(const TInstant& deadline);
+ TInstant GetInactiveUntil() const;
+ void Schedule(const TScheduledEventQueueItem& item);
+ bool IsScheduledEmpty() const;
TInstant GetFirstScheduleDeadline() const;
ui64 GetSentEventCount() const;
-
- private:
- TScheduledEventsList Scheduled;
- TInstant InactiveUntil;
- TEventsList Sent;
-#ifdef DEBUG_ORDER_EVENTS
+
+ private:
+ TScheduledEventsList Scheduled;
+ TInstant InactiveUntil;
+ TEventsList Sent;
+#ifdef DEBUG_ORDER_EVENTS
TMap<IEventHandle*, ui64> TrackSent;
- ui64 ExpectedReceive;
- ui64 NextToSend;
-#endif
- };
-
+ ui64 ExpectedReceive;
+ ui64 NextToSend;
+#endif
+ };
+
typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList;
-
- class TEmptyEventQueueException : public yexception {
- public:
- TEmptyEventQueueException() {
- Append("Event queue is still empty.");
- }
- };
-
+
+ class TEmptyEventQueueException : public yexception {
+ public:
+ TEmptyEventQueueException() {
+ Append("Event queue is still empty.");
+ }
+ };
+
class TSchedulingLimitReachedException : public yexception {
public:
TSchedulingLimitReachedException(ui64 limit) {
@@ -183,83 +183,83 @@ namespace NActors {
};
class TTestActorRuntimeBase: public TNonCopyable {
- public:
- class TEdgeActor;
- class TSchedulerThreadStub;
- class TExecutorPoolStub;
- class TTimeProvider;
-
- enum class EEventAction {
- PROCESS,
- DROP,
- RESCHEDULE
- };
-
+ public:
+ class TEdgeActor;
+ class TSchedulerThreadStub;
+ class TExecutorPoolStub;
+ class TTimeProvider;
+
+ enum class EEventAction {
+ PROCESS,
+ DROP,
+ RESCHEDULE
+ };
+
typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver;
typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector;
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter;
typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter;
typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver;
-
+
TTestActorRuntimeBase(THeSingleSystemEnv);
TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount);
TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false);
virtual ~TTestActorRuntimeBase();
- bool IsRealThreads() const;
+ bool IsRealThreads() const;
static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue);
static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event);
static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline);
static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId);
- TEventObserver SetObserverFunc(TEventObserver observerFunc);
- TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
- TEventFilter SetEventFilter(TEventFilter filterFunc);
- TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc);
- TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc);
+ TEventObserver SetObserverFunc(TEventObserver observerFunc);
+ TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc);
+ TEventFilter SetEventFilter(TEventFilter filterFunc);
+ TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc);
+ TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc);
static bool IsVerbose();
static void SetVerbose(bool verbose);
- TDuration SetDispatchTimeout(TDuration timeout);
+ TDuration SetDispatchTimeout(TDuration timeout);
void SetDispatchedEventsLimit(ui64 limit) {
DispatchedEventsLimit = limit;
}
- TDuration SetReschedulingDelay(TDuration delay);
+ TDuration SetReschedulingDelay(TDuration delay);
void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
- void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
- TIntrusivePtr<ITimeProvider> GetTimeProvider();
- TInstant GetCurrentTime() const;
- void UpdateCurrentTime(TInstant newTime);
+ void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
+ TIntrusivePtr<ITimeProvider> GetTimeProvider();
+ TInstant GetCurrentTime() const;
+ void UpdateCurrentTime(TInstant newTime);
void AdvanceCurrentTime(TDuration duration);
void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0);
virtual void Initialize();
- ui32 GetNodeId(ui32 index = 0) const;
- ui32 GetNodeCount() const;
- ui64 AllocateLocalId();
+ ui32 GetNodeId(ui32 index = 0) const;
+ ui32 GetNodeCount() const;
+ ui64 AllocateLocalId();
ui32 InterconnectPoolId() const;
TString GetTempDir();
TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0,
- TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0,
+ TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0,
const TActorId& parentid = TActorId());
TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
const TActorId& parentid = TActorId());
TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0);
TActorId AllocateEdgeActor(ui32 nodeIndex = 0);
- TEventsList CaptureEvents();
- TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId);
- TScheduledEventsList CaptureScheduledEvents();
- void PushFront(TAutoPtr<IEventHandle>& ev);
- void PushEventsFront(TEventsList& events);
- void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events);
- // doesn't dispatch events for edge actors
+ TEventsList CaptureEvents();
+ TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId);
+ TScheduledEventsList CaptureScheduledEvents();
+ void PushFront(TAutoPtr<IEventHandle>& ev);
+ void PushEventsFront(TEventsList& events);
+ void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events);
+ // doesn't dispatch events for edge actors
bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions());
bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout);
bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline);
- void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
- void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0);
- void ClearCounters();
- ui64 GetCounter(ui32 evType) const;
+ void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false);
+ void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0);
+ void ClearCounters();
+ ui64 GetCounter(ui32 evType) const;
TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0);
void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max());
TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
@@ -267,7 +267,7 @@ namespace NActors {
IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
bool IsScheduleForActorEnabled(const TActorId& actorId) const;
- TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
+ TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
void SetupMonitoring();
template<typename T>
@@ -287,24 +287,24 @@ namespace NActors {
TActorSystem* SingleSys() const;
TActorSystem* GetAnyNodeActorSystem();
TActorSystem* GetActorSystem(ui32 nodeId);
- template <typename TEvent>
+ template <typename TEvent>
TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) {
- handle.Destroy();
- const ui32 eventType = TEvent::EventType;
+ handle.Destroy();
+ const ui32 eventType = TEvent::EventType;
WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
- if (event->GetTypeRewrite() != eventType)
- return false;
-
- TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get();
- if (predicate(*typedEvent)) {
- handle = event;
- return true;
- }
-
- return false;
+ if (event->GetTypeRewrite() != eventType)
+ return false;
+
+ TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get();
+ if (predicate(*typedEvent)) {
+ handle = event;
+ return true;
+ }
+
+ return false;
}, {}, simTimeout);
-
+
if (simTimeout == TDuration::Max())
Y_VERIFY(handle);
@@ -313,8 +313,8 @@ namespace NActors {
} else {
return nullptr;
}
- }
-
+ }
+
template<class TEvent>
typename TEvent::TPtr GrabEdgeEventIf(
const TSet<TActorId>& edgeFilter,
@@ -353,12 +353,12 @@ namespace NActors {
return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout);
}
- template <typename TEvent>
+ template <typename TEvent>
TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; };
return GrabEdgeEventIf(handle, truth, simTimeout);
- }
-
+ }
+
template <typename TEvent>
THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) {
TAutoPtr<IEventHandle> handle;
@@ -494,18 +494,18 @@ namespace NActors {
private:
IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const;
- void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
- TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
+ void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem);
+ TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint);
void ClearMailbox(ui32 nodeId, ui32 hint);
- void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId);
- void UpdateFinalEventsStatsForEachContext(IEventHandle& ev);
+ void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId);
+ void UpdateFinalEventsStatsForEachContext(IEventHandle& ev);
bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
-
- private:
+
+ private:
ui64 ScheduledCount;
ui64 ScheduledLimit;
- THolder<TTempDir> TmpDir;
- const TThread::TId MainThreadId;
+ THolder<TTempDir> TmpDir;
+ const TThread::TId MainThreadId;
protected:
bool UseRealInterconnect = false;
@@ -513,25 +513,25 @@ namespace NActors {
bool IsInitialized = false;
bool SingleSysEnv = false;
const TString ClusterUUID;
- const ui32 FirstNodeId;
- const ui32 NodeCount;
+ const ui32 FirstNodeId;
+ const ui32 NodeCount;
const ui32 DataCenterCount;
- const bool UseRealThreads;
+ const bool UseRealThreads;
- ui64 LocalId;
- TMutex Mutex;
- TCondVar MailboxesHasEvents;
- TEventMailBoxList Mailboxes;
+ ui64 LocalId;
+ TMutex Mutex;
+ TCondVar MailboxesHasEvents;
+ TEventMailBoxList Mailboxes;
TMap<ui32, ui64> EvCounters;
- ui64 DispatchCyclesCount;
- ui64 DispatchedEventsCount;
+ ui64 DispatchCyclesCount;
+ ui64 DispatchedEventsCount;
ui64 DispatchedEventsLimit = 2'500'000;
TActorId CurrentRecipient;
ui64 DispatcherRandomSeed;
- TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
+ TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
TAutoPtr<TLogBackend> LogBackend;
bool NeedMonitoring;
-
+
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
@@ -554,27 +554,27 @@ namespace NActors {
return static_cast<T*>(AppData0.get());
}
- TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
- TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
+ TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
+ TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
TIntrusivePtr<NInterconnect::TPollerThreads> Poller;
- volatile ui64* ActorSystemTimestamp;
+ volatile ui64* ActorSystemTimestamp;
volatile ui64* ActorSystemMonotonic;
TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices;
TMap<TActorId, IActor*> LocalServicesActors;
TMap<IActor*, TActorId> ActorToActorId;
- THolder<TMailboxTable> MailboxTable;
+ THolder<TMailboxTable> MailboxTable;
std::shared_ptr<void> AppData0;
- THolder<TActorSystem> ActorSystem;
+ THolder<TActorSystem> ActorSystem;
THolder<IExecutorPool> SchedulerPool;
TVector<IExecutorPool*> ExecutorPools;
- THolder<TExecutorThread> ExecutorThread;
+ THolder<TExecutorThread> ExecutorThread;
};
-
+
struct INodeFactory {
virtual ~INodeFactory() = default;
virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0;
- };
-
+ };
+
struct TDefaultNodeFactory final: INodeFactory {
virtual TIntrusivePtr<TNodeDataBase> CreateNode() override {
return new TNodeDataBase();
@@ -601,94 +601,94 @@ namespace NActors {
private:
void InitNode(TNodeDataBase* node, size_t idx);
- struct TDispatchContext {
- const TDispatchOptions* Options;
- TDispatchContext* PrevContext;
-
+ struct TDispatchContext {
+ const TDispatchOptions* Options;
+ TDispatchContext* PrevContext;
+
TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency;
TSet<TEventMailboxId> FoundNonEmptyMailboxes;
bool FinalEventFound = false;
- };
-
+ };
+
TProgramShouldContinue ShouldContinue;
TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes;
- ui64 CurrentTimestamp;
+ ui64 CurrentTimestamp;
TSet<TActorId> EdgeActors;
THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox;
- TDuration DispatchTimeout;
- TDuration ReschedulingDelay;
- TEventObserver ObserverFunc;
- TScheduledEventsSelector ScheduledEventsSelectorFunc;
- TEventFilter EventFilterFunc;
- TScheduledEventFilter ScheduledEventFilterFunc;
- TRegistrationObserver RegistrationObserver;
+ TDuration DispatchTimeout;
+ TDuration ReschedulingDelay;
+ TEventObserver ObserverFunc;
+ TScheduledEventsSelector ScheduledEventsSelectorFunc;
+ TEventFilter EventFilterFunc;
+ TScheduledEventFilter ScheduledEventFilterFunc;
+ TRegistrationObserver RegistrationObserver;
TSet<TActorId> BlockedOutput;
TSet<TActorId> ScheduleWhiteList;
THashMap<TActorId, TActorId> ScheduleWhiteListParent;
THashMap<TActorId, TString> ActorNames;
TDispatchContext* CurrentDispatchContext;
TVector<ui64> TxAllocatorTabletIds;
-
- static ui32 NextNodeId;
- };
-
- template <typename TEvent>
- TEvent* FindEvent(TEventsList& events) {
- for (auto& event : events) {
- if (event && event->GetTypeRewrite() == TEvent::EventType) {
- return static_cast<TEvent*>(event->GetBase());
- }
- }
-
- return nullptr;
- }
-
- template <typename TEvent>
+
+ static ui32 NextNodeId;
+ };
+
+ template <typename TEvent>
+ TEvent* FindEvent(TEventsList& events) {
+ for (auto& event : events) {
+ if (event && event->GetTypeRewrite() == TEvent::EventType) {
+ return static_cast<TEvent*>(event->GetBase());
+ }
+ }
+
+ return nullptr;
+ }
+
+ template <typename TEvent>
TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) {
- for (auto& event : events) {
- if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) {
- return static_cast<TEvent*>(event->GetBase());
- }
- }
-
- return nullptr;
- }
-
- template <typename TEvent>
- TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) {
- ev.Destroy();
- for (auto& event : events) {
- if (event && event->GetTypeRewrite() == TEvent::EventType) {
- ev = event;
- return static_cast<TEvent*>(ev->GetBase());
- }
- }
-
- return nullptr;
- }
-
- template <typename TEvent>
- TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev,
+ for (auto& event : events) {
+ if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) {
+ return static_cast<TEvent*>(event->GetBase());
+ }
+ }
+
+ return nullptr;
+ }
+
+ template <typename TEvent>
+ TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) {
+ ev.Destroy();
+ for (auto& event : events) {
+ if (event && event->GetTypeRewrite() == TEvent::EventType) {
+ ev = event;
+ return static_cast<TEvent*>(ev->GetBase());
+ }
+ }
+
+ return nullptr;
+ }
+
+ template <typename TEvent>
+ TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev,
const std::function<bool(const typename TEvent::TPtr&)>& predicate) {
- ev.Destroy();
- for (auto& event : events) {
- if (event && event->GetTypeRewrite() == TEvent::EventType) {
- if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) {
- ev = event;
- return static_cast<TEvent*>(ev->GetBase());
- }
- }
- }
-
- return nullptr;
- }
-
- class IStrandingDecoratorFactory {
- public:
- virtual ~IStrandingDecoratorFactory() {}
+ ev.Destroy();
+ for (auto& event : events) {
+ if (event && event->GetTypeRewrite() == TEvent::EventType) {
+ if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) {
+ ev = event;
+ return static_cast<TEvent*>(ev->GetBase());
+ }
+ }
+ }
+
+ return nullptr;
+ }
+
+ class IStrandingDecoratorFactory {
+ public:
+ virtual ~IStrandingDecoratorFactory() {}
virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
- };
-
+ };
+
struct IReplyChecker {
virtual ~IReplyChecker() {}
virtual void OnRequest(IEventHandle *request) = 0;
@@ -712,5 +712,5 @@ namespace NActors {
TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker);
- extern ui64 DefaultRandomSeed;
-}
+ extern ui64 DefaultRandomSeed;
+}
diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make
index 1afb3f6059..fbd9ec2dc9 100644
--- a/library/cpp/actors/testlib/ya.make
+++ b/library/cpp/actors/testlib/ya.make
@@ -12,8 +12,8 @@ PEERDIR(
library/cpp/actors/core
library/cpp/actors/interconnect/mock
library/cpp/actors/protos
- library/cpp/random_provider
- library/cpp/time_provider
+ library/cpp/random_provider
+ library/cpp/time_provider
)
IF (GCC)
diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h
index 172f08fc73..29a81968a8 100644
--- a/library/cpp/actors/util/local_process_key.h
+++ b/library/cpp/actors/util/local_process_key.h
@@ -1,30 +1,30 @@
-#pragma once
-
+#pragma once
+
#include <util/string/builder.h>
-#include <util/generic/strbuf.h>
-#include <util/generic/vector.h>
-#include <util/generic/hash.h>
-#include <util/generic/singleton.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/vector.h>
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
#include <util/generic/serialized_enum.h>
-
-template <typename T>
-class TLocalProcessKeyState {
-
-template <typename U, const char* Name>
-friend class TLocalProcessKey;
+
+template <typename T>
+class TLocalProcessKeyState {
+
+template <typename U, const char* Name>
+friend class TLocalProcessKey;
template <typename U, typename EnumT>
friend class TEnumProcessKey;
-
-public:
- static TLocalProcessKeyState& GetInstance() {
- return *Singleton<TLocalProcessKeyState<T>>();
- }
-
- size_t GetCount() const {
+
+public:
+ static TLocalProcessKeyState& GetInstance() {
+ return *Singleton<TLocalProcessKeyState<T>>();
+ }
+
+ size_t GetCount() const {
return StartIndex + Names.size();
- }
-
- TStringBuf GetNameByIndex(size_t index) const {
+ }
+
+ TStringBuf GetNameByIndex(size_t index) const {
if (index < StartIndex) {
return StaticNames[index];
} else {
@@ -32,24 +32,24 @@ public:
Y_ENSURE(index < Names.size());
return Names[index];
}
- }
-
- size_t GetIndexByName(TStringBuf name) const {
- auto it = Map.find(name);
- Y_ENSURE(it != Map.end());
- return it->second;
- }
-
-private:
- size_t Register(TStringBuf name) {
+ }
+
+ size_t GetIndexByName(TStringBuf name) const {
+ auto it = Map.find(name);
+ Y_ENSURE(it != Map.end());
+ return it->second;
+ }
+
+private:
+ size_t Register(TStringBuf name) {
auto x = Map.emplace(name, Names.size()+StartIndex);
- if (x.second) {
- Names.emplace_back(name);
- }
-
- return x.first->second;
- }
-
+ if (x.second) {
+ Names.emplace_back(name);
+ }
+
+ return x.first->second;
+ }
+
size_t Register(TStringBuf name, ui32 index) {
Y_VERIFY(index < StartIndex);
auto x = Map.emplace(name, index);
@@ -58,7 +58,7 @@ private:
return x.first->second;
}
-private:
+private:
static constexpr ui32 StartIndex = 2000;
TVector<TString> FillStaticNames() {
@@ -73,22 +73,22 @@ private:
TVector<TString> StaticNames = FillStaticNames();
TVector<TString> Names;
THashMap<TString, size_t> Map;
-};
-
-template <typename T, const char* Name>
-class TLocalProcessKey {
-public:
- static TStringBuf GetName() {
- return Name;
- }
-
- static size_t GetIndex() {
- return Index;
- }
-
-private:
- inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name);
-};
+};
+
+template <typename T, const char* Name>
+class TLocalProcessKey {
+public:
+ static TStringBuf GetName() {
+ return Name;
+ }
+
+ static size_t GetIndex() {
+ return Index;
+ }
+
+private:
+ inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name);
+};
template <typename T, typename EnumT>
class TEnumProcessKey {
diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp
index 74069ff15b..ff1cd248d3 100644
--- a/library/cpp/actors/util/threadparkpad.cpp
+++ b/library/cpp/actors/util/threadparkpad.cpp
@@ -43,7 +43,7 @@ namespace NActors {
};
#elif defined _win32_
-#include <util/generic/bt_exception.h>
+#include <util/generic/bt_exception.h>
#include <util/generic/yexception.h>
namespace NActors {
diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make
index 37488c3962..b3aeabf6b6 100644
--- a/library/cpp/actors/util/ya.make
+++ b/library/cpp/actors/util/ya.make
@@ -14,7 +14,7 @@ SRCS(
funnel_queue.h
futex.h
intrinsics.h
- local_process_key.h
+ local_process_key.h
named_tuple.h
queue_chunk.h
queue_oneone_inplace.h
diff --git a/library/cpp/monlib/dynamic_counters/counters.h b/library/cpp/monlib/dynamic_counters/counters.h
index dc178cfbe0..fd027ddc99 100644
--- a/library/cpp/monlib/dynamic_counters/counters.h
+++ b/library/cpp/monlib/dynamic_counters/counters.h
@@ -81,11 +81,11 @@ namespace NMonitoring {
}
};
-#ifdef _MSC_VER
-#pragma warning(push)
-#pragma warning(disable : 4522) // multiple assignment operators specified
-#endif // _MSC_VER
-
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4522) // multiple assignment operators specified
+#endif // _MSC_VER
+
struct TCounterForPtr: public TDeprecatedCounter, public TCountableBase {
TCounterForPtr(bool derivative = false, EVisibility vis = EVisibility::Public)
: TDeprecatedCounter(0ULL, derivative)
@@ -95,7 +95,7 @@ namespace NMonitoring {
TCounterForPtr(const TCounterForPtr&) = delete;
TCounterForPtr& operator=(const TCounterForPtr& other) = delete;
-
+
void Accept(
const TString& labelName, const TString& labelValue,
ICountableConsumer& consumer) const override {
@@ -181,10 +181,10 @@ namespace NMonitoring {
using THistogramPtr = TIntrusivePtr<THistogramCounter>;
-#ifdef _MSC_VER
-#pragma warning(pop)
-#endif
-
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
struct TDynamicCounters;
typedef TIntrusivePtr<TDynamicCounters> TDynamicCounterPtr;
diff --git a/library/cpp/random_provider/random_provider.cpp b/library/cpp/random_provider/random_provider.cpp
index 64cb48b8b7..5f09d8358a 100644
--- a/library/cpp/random_provider/random_provider.cpp
+++ b/library/cpp/random_provider/random_provider.cpp
@@ -1,75 +1,75 @@
-#include "random_provider.h"
-#include <util/random/mersenne.h>
-#include <util/random/random.h>
+#include "random_provider.h"
+#include <util/random/mersenne.h>
+#include <util/random/random.h>
#include <util/system/unaligned_mem.h>
-
-namespace {
- void SetV4(TGUID& g) {
- g.dw[1] &= 0x0fffffff;
- g.dw[1] |= 0x40000000;
-
- g.dw[2] &= 0xffffff3f;
- g.dw[2] |= 0x00000080;
- }
-}
-
-class TDefaultRandomProvider: public IRandomProvider {
-public:
- ui64 GenRand() noexcept override {
- return RandomNumber<ui64>();
- }
-
- TGUID GenGuid() noexcept override {
- TGUID ret;
- CreateGuid(&ret);
- return ret;
- }
-
+
+namespace {
+ void SetV4(TGUID& g) {
+ g.dw[1] &= 0x0fffffff;
+ g.dw[1] |= 0x40000000;
+
+ g.dw[2] &= 0xffffff3f;
+ g.dw[2] |= 0x00000080;
+ }
+}
+
+class TDefaultRandomProvider: public IRandomProvider {
+public:
+ ui64 GenRand() noexcept override {
+ return RandomNumber<ui64>();
+ }
+
+ TGUID GenGuid() noexcept override {
+ TGUID ret;
+ CreateGuid(&ret);
+ return ret;
+ }
+
TGUID GenUuid4() noexcept override {
- TGUID ret;
+ TGUID ret;
WriteUnaligned<ui64>(ret.dw, RandomNumber<ui64>());
WriteUnaligned<ui64>(ret.dw + 2, RandomNumber<ui64>());
- SetV4(ret);
- return ret;
- }
-};
-
-class TDeterministicRandomProvider: public IRandomProvider {
-public:
- TDeterministicRandomProvider(ui64 seed)
- : Gen(seed)
- {
- }
-
- ui64 GenRand() noexcept override {
- return Gen.GenRand();
- }
-
- TGUID GenGuid() noexcept override {
- TGUID ret;
+ SetV4(ret);
+ return ret;
+ }
+};
+
+class TDeterministicRandomProvider: public IRandomProvider {
+public:
+ TDeterministicRandomProvider(ui64 seed)
+ : Gen(seed)
+ {
+ }
+
+ ui64 GenRand() noexcept override {
+ return Gen.GenRand();
+ }
+
+ TGUID GenGuid() noexcept override {
+ TGUID ret;
WriteUnaligned<ui64>(ret.dw, Gen.GenRand());
- ret.dw[2] = (ui32)Gen.GenRand();
- ret.dw[3] = ++GuidCount;
- return ret;
- }
-
+ ret.dw[2] = (ui32)Gen.GenRand();
+ ret.dw[3] = ++GuidCount;
+ return ret;
+ }
+
TGUID GenUuid4() noexcept override {
- TGUID ret;
+ TGUID ret;
WriteUnaligned<ui64>(ret.dw, Gen.GenRand());
WriteUnaligned<ui64>(ret.dw + 2, Gen.GenRand());
- SetV4(ret);
- return ret;
- }
-
-private:
- TMersenne<ui64> Gen;
- ui32 GuidCount = 0;
-};
-
-TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider() {
- return TIntrusivePtr<IRandomProvider>(new TDefaultRandomProvider());
-}
-
-TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed) {
- return TIntrusivePtr<IRandomProvider>(new TDeterministicRandomProvider(seed));
-}
+ SetV4(ret);
+ return ret;
+ }
+
+private:
+ TMersenne<ui64> Gen;
+ ui32 GuidCount = 0;
+};
+
+TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider() {
+ return TIntrusivePtr<IRandomProvider>(new TDefaultRandomProvider());
+}
+
+TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed) {
+ return TIntrusivePtr<IRandomProvider>(new TDeterministicRandomProvider(seed));
+}
diff --git a/library/cpp/random_provider/random_provider.h b/library/cpp/random_provider/random_provider.h
index 6e7226866f..41526f00cf 100644
--- a/library/cpp/random_provider/random_provider.h
+++ b/library/cpp/random_provider/random_provider.h
@@ -1,14 +1,14 @@
-#pragma once
-
-#include <util/generic/guid.h>
-#include <util/random/common_ops.h>
-
-class IRandomProvider: public TThrRefBase, public TCommonRNG<ui64, IRandomProvider> {
-public:
- virtual TGUID GenGuid() noexcept = 0;
- virtual TGUID GenUuid4() noexcept = 0;
- virtual ui64 GenRand() noexcept = 0; // for TCommonRNG
-};
-
-TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider();
-TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed);
+#pragma once
+
+#include <util/generic/guid.h>
+#include <util/random/common_ops.h>
+
+class IRandomProvider: public TThrRefBase, public TCommonRNG<ui64, IRandomProvider> {
+public:
+ virtual TGUID GenGuid() noexcept = 0;
+ virtual TGUID GenUuid4() noexcept = 0;
+ virtual ui64 GenRand() noexcept = 0; // for TCommonRNG
+};
+
+TIntrusivePtr<IRandomProvider> CreateDefaultRandomProvider();
+TIntrusivePtr<IRandomProvider> CreateDeterministicRandomProvider(ui64 seed);
diff --git a/library/cpp/random_provider/ya.make b/library/cpp/random_provider/ya.make
index 38d1f070e8..908a491f36 100644
--- a/library/cpp/random_provider/ya.make
+++ b/library/cpp/random_provider/ya.make
@@ -1,13 +1,13 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
g:kikimr
g:yql
)
-
-SRCS(
- random_provider.cpp
- random_provider.h
-)
-
-END()
+
+SRCS(
+ random_provider.cpp
+ random_provider.h
+)
+
+END()
diff --git a/library/cpp/regex/pire/regexp.h b/library/cpp/regex/pire/regexp.h
index 94bba4064b..60f449b9af 100644
--- a/library/cpp/regex/pire/regexp.h
+++ b/library/cpp/regex/pire/regexp.h
@@ -4,7 +4,7 @@
#include <library/cpp/charset/doccodes.h>
#include <library/cpp/charset/recyr.hh>
-#include <util/generic/maybe.h>
+#include <util/generic/maybe.h>
#include <util/generic/strbuf.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
@@ -42,7 +42,7 @@ namespace NRegExp {
bool CaseInsensitive = false;
bool Surround = false;
- TMaybe<size_t> CapturePos;
+ TMaybe<size_t> CapturePos;
ECharset Charset = CODES_UNKNOWN;
bool AndNotSupport = false;
};
@@ -70,8 +70,8 @@ namespace NRegExp {
lexer.AddFeature(NPire::NFeatures::CaseInsensitive());
}
- if (opts.CapturePos) {
- lexer.AddFeature(NPire::NFeatures::Capture(*opts.CapturePos));
+ if (opts.CapturePos) {
+ lexer.AddFeature(NPire::NFeatures::Capture(*opts.CapturePos));
}
if (opts.AndNotSupport) {
diff --git a/library/cpp/testing/unittest/utmain.cpp b/library/cpp/testing/unittest/utmain.cpp
index 305bc6b40f..33b0ccc63a 100644
--- a/library/cpp/testing/unittest/utmain.cpp
+++ b/library/cpp/testing/unittest/utmain.cpp
@@ -584,13 +584,13 @@ public:
_CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_FILE | _CRTDBG_MODE_DEBUG);
_CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR);
}
- }
- ~TWinEnvironment() {
+ }
+ ~TWinEnvironment() {
if (!IsDebuggerPresent()) {
_CrtSetReportMode(_CRT_WARN, _CRTDBG_MODE_FILE);
_CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR);
}
-
+
SetConsoleOutputCP(OutputCP); // restore original output CP at program exit
}
diff --git a/library/cpp/time_provider/time_provider.cpp b/library/cpp/time_provider/time_provider.cpp
index 6c1ba8e07c..c3181856f8 100644
--- a/library/cpp/time_provider/time_provider.cpp
+++ b/library/cpp/time_provider/time_provider.cpp
@@ -1,30 +1,30 @@
-#include "time_provider.h"
-
-class TDefaultTimeProvider: public ITimeProvider {
-public:
- TInstant Now() override {
- return TInstant::Now();
- }
-};
-
-class TDeterministicTimeProvider: public ITimeProvider {
-public:
- TDeterministicTimeProvider(ui64 seed) {
- Value = TInstant::Seconds(seed);
- }
-
- TInstant Now() override {
- return Value;
- }
-
-private:
- TInstant Value;
-};
-
-TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider() {
- return TIntrusivePtr<ITimeProvider>(new TDefaultTimeProvider());
-}
-
-TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed) {
- return TIntrusivePtr<ITimeProvider>(new TDeterministicTimeProvider(seed));
-}
+#include "time_provider.h"
+
+class TDefaultTimeProvider: public ITimeProvider {
+public:
+ TInstant Now() override {
+ return TInstant::Now();
+ }
+};
+
+class TDeterministicTimeProvider: public ITimeProvider {
+public:
+ TDeterministicTimeProvider(ui64 seed) {
+ Value = TInstant::Seconds(seed);
+ }
+
+ TInstant Now() override {
+ return Value;
+ }
+
+private:
+ TInstant Value;
+};
+
+TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider() {
+ return TIntrusivePtr<ITimeProvider>(new TDefaultTimeProvider());
+}
+
+TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed) {
+ return TIntrusivePtr<ITimeProvider>(new TDeterministicTimeProvider(seed));
+}
diff --git a/library/cpp/time_provider/time_provider.h b/library/cpp/time_provider/time_provider.h
index 46e0a885da..86ff965099 100644
--- a/library/cpp/time_provider/time_provider.h
+++ b/library/cpp/time_provider/time_provider.h
@@ -1,11 +1,11 @@
-#pragma once
-
-#include <util/datetime/base.h>
-
-class ITimeProvider: public TThrRefBase {
-public:
- virtual TInstant Now() = 0;
-};
-
-TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider();
-TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed);
+#pragma once
+
+#include <util/datetime/base.h>
+
+class ITimeProvider: public TThrRefBase {
+public:
+ virtual TInstant Now() = 0;
+};
+
+TIntrusivePtr<ITimeProvider> CreateDefaultTimeProvider();
+TIntrusivePtr<ITimeProvider> CreateDeterministicTimeProvider(ui64 seed);
diff --git a/library/cpp/time_provider/ya.make b/library/cpp/time_provider/ya.make
index cf3995b2a4..a1bcdd2d96 100644
--- a/library/cpp/time_provider/ya.make
+++ b/library/cpp/time_provider/ya.make
@@ -1,13 +1,13 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
g:kikimr
g:yql
)
-
-SRCS(
- time_provider.cpp
- time_provider.h
-)
-
-END()
+
+SRCS(
+ time_provider.cpp
+ time_provider.h
+)
+
+END()