aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorCthulhu <cthulhu@yandex-team.ru>2022-02-10 16:47:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:44 +0300
commit6aced6c854653b75aab9808d5995be5fc4d9fa53 (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors
parentbcb3e9d0eb2a8188a6a9fe0907a8949ce4881a4e (diff)
downloadydb-6aced6c854653b75aab9808d5995be5fc4d9fa53.tar.gz
Restoring authorship annotation for Cthulhu <cthulhu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/README.md212
-rw-r--r--library/cpp/actors/core/actorsystem.cpp8
-rw-r--r--library/cpp/actors/core/callstack.cpp42
-rw-r--r--library/cpp/actors/core/callstack.h48
-rw-r--r--library/cpp/actors/core/event.h10
-rw-r--r--library/cpp/actors/core/executor_thread.cpp6
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp52
-rw-r--r--library/cpp/actors/core/log.h44
-rw-r--r--library/cpp/actors/core/log_settings.cpp62
-rw-r--r--library/cpp/actors/core/log_settings.h90
-rw-r--r--library/cpp/actors/core/scheduler_cookie.h4
-rw-r--r--library/cpp/actors/core/scheduler_queue.h2
-rw-r--r--library/cpp/actors/core/servicemap.h2
-rw-r--r--library/cpp/actors/core/ya.make4
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp198
-rw-r--r--library/cpp/actors/helpers/selfping_actor.h4
-rw-r--r--library/cpp/actors/helpers/selfping_actor_ut.cpp6
-rw-r--r--library/cpp/actors/http/http_proxy.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_counters.cpp2
-rw-r--r--library/cpp/actors/interconnect/poller_tcp_unit.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/lib/interrupter.h4
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp264
-rw-r--r--library/cpp/actors/testlib/test_runtime.h70
-rw-r--r--library/cpp/actors/util/rope.h12
-rw-r--r--library/cpp/actors/util/unordered_cache.h12
-rw-r--r--library/cpp/actors/util/ut/ya.make10
27 files changed, 588 insertions, 588 deletions
diff --git a/library/cpp/actors/README.md b/library/cpp/actors/README.md
index c4058444dd..c39908f2f5 100644
--- a/library/cpp/actors/README.md
+++ b/library/cpp/actors/README.md
@@ -1,107 +1,107 @@
-## Actor library
-
-### Часть первая, вводная.
-Иногда приходится разрабатывать асинхронные, существенно параллельные, местами распределённые программы. Иногда еще и внутренняя логика нетривиальна, разнородна, пишется разными командами не один год. Всё как мы любим. Человечеством придумано не так много способов внутренней организации структуры и кода таких программ. Большинство из них плохие (и именно из-за плохих подходов разработка асинхронных, многопоточных программ приобрела дурную славу). Некоторые получше. А серебряной пули как обычно нет.
-
-Когда мы начинали разработку Yandex Database (тогда еще KiKiMR), сразу было понятно что простыми наколеночными поделиями обойтись (и сделать при этом хорошо, так что бы не было стыдно) не получится. В качестве базиса мы выбрали мессадж-пассинг и модель акторов. И не пожалели. Постепенно этот подход распространился на смежные проекты.
-
-### Базовые концепции.
-Если отбросить шелуху – представляем сервис (программу в случае запуска изолированного бинарника) как ансамбль независимых агентов, взаимодействующих через отправку асинхронных сообщений внутри общего окружения. Тут все слова важны:
-
-Независимых – не разделяют состояние и поток выполнения.
-Передача сообщений – формализуем протоколы, а не интерфейсы.
-
-Асинхронная – не блокируемся на отправке сообщений.
-Общее окружение – все агенты разделяют общий пул ресурсов и каждый из них, зная адрес, может послать сообщение каждому.
-
-В более хайповых терминах – очень похоже на колокейтед микросервисы, только уровнем ниже. И да, мы заведомо не хотели прятать асинхронщину и параллелизм от разработчика, показывая прям самое мясо.
-
-### IActor.
-https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/core/actor.h?rev=5315854#L105
-Базовый класс всех агентов, напрямую обычно не используется. Инстанцируется либо TActor, либо TActorBootstrapped. Фактически весь полезный код программы размещается в акторах.
-(важное замечание – в коде увидите ручки с TActorContext и без него, схожие по названию и назначению. На данный момент вариант с TActorContext является устаревшим, новый код стоит писать без его использования).
-Важные методы:
-
-PassAway – единственный корректный способ зарегистрированному актору умереть. Может вызываться только находясь внутри обработчика сообщения.
-Send – отправка сообщения, зная адрес получателя. В акторе доступен хелпер, принимающий непосредственно сообщение. Базовый вызов, принимающий полный event handle – доступен в контексте.
-
-Become – установить функцию-обработчик сообщений, которая будет использована при получении следующего сообщения.
-
-Register – зарегистрировать новый актор в акторсистеме, с выделением нового мейлбокса. Важно – с момента вызова владение актором передается акторсистеме, т.е. уже к моменту выхода актор может начать выполняться на другом потоке, нельзя к нему ни обращаться прямыми вызовами, ни даже предполагать что он еще жив.
-
-Schedule – зарегистрировать сообщение, которое будет отправлено не менее чем через запрошенную задержку. В акторе доступен хелпер, декорирующий сообщение хендлом отправки самому себе, в контексте можно передать полный хендл.
-
-SelfId – узнать собственный адрес. Возвращаемый объект TActorIdentity можно передавать если требуется делегировать отправку сообщений от имени актора (например если пишете полезный код пользуясь пассивными объектами).
-Посылка сообщений дешёвая, не нужно на ней чрезмерно экономить (но не бесплатная – поэтому посылать сообщения только ради посылки сообщений то же не стоит).
-
-Инстанцирование акторов так же дёшево, актор на запрос или фазу запроса – вполне нормальная практика. Мультиплексировать обработку разных запросов в одном акторе – так же вполне нормально. В нашем коде много примеров и первого, и второго. Пользуйтесь здравым смыслов и собственным вкусом.
-Т.к. на время обработки сообщения актор занимает тред из пула акторсистемы – уходить в длинные вычисления лучше на отдельном отселённом акторе (и либо отселять в отдельный пол акторсистемы, либо контролировать параллельность брокером ресурсов), блокирующие вызовы делать почти всегда ошибка. Стремление написать мютекс - ересь и от лукавого.
-Идентифицируются акторы своим TActorID-ом, который уникален и вы не должны его придумывать из воздуха, а только получить из регистрации (для порождённых акторов) или его вам должен рассказать кто-то, законно его знающий.
-
-Отправка на несуществующий актор (уже умерший) безопасна, сообщение будет просто выброшено в момент обработки (как обрабатывать недоставку сообщений в протоколах расскажу ниже).
-
-Кроме нормальных TActorID существуют еще и сервисные (составленные из строчки и номера ноды). Под ними может быть зарегистрирован реальный актор и фактически при получении сообщения по сервисному адресу – попробует переправить его текущему фактическому. Это позволяет размещать хорошо известные сервисы по хорошо известному адресу, не выстраивая параллельную машинерию поиска.
-
-Строить из актора конечный автомат при помощи переключений функции-обработчика – выбор в каждом конкретном случае, иногда удобнее да, иногда сваливать всё в одно состояние, а иногда – применять гибридное решение (когда часть жизненного цикла – обычно инициализации и завершение – выражены в переходах, а часть – нет).
-Меньше слов, больше дела – этого уже достаточно что бы прочитать самый простой пример. https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/01_ping_pong
-Здесь можно увидеть образец самого простого актора, занимающегося переброской сообщений и использующего все основные вызовы. Заодно покрутив за разные ручки (количество тредов в тредпуле, количество пар перебрасывающихся акторов) можно посмотреть на изменение поведения системы (hint: в таких простых сценариях максимум перфоманса достигается при одном треде в тредпулах).
-
-### Event и Event Handle.
-Полезную нагрузку сообщений заворачиваем в наследника IEventBase, у которого два важных метода – сериализация и загрузка. Сериализация виртуальная, а вот загрузка – нет, и для разбора сообщения из байтовой последовательности – необходимо на стороне получателя сматчить число-идентификатор типа ивента с С++ типом. Именно это делают макросы из hfunc.h. На практике ивенты создаются либо как наследник TEventLocal<> (для строго локальных сообщений) либо как наследник TEventPB<> (для потенциально пересылаемых по сети сообщений, типизируются protobuf-мессаджем).
-
-Кроме непосредственно ивента (в виде структуры либо в виде байтовой строки) для пересылки сообщения необходим набор дополнительных полей
-
-Адресат
-
-Отправитель
-
-Тип сообщения
-
-Кука
-
-Флаги
-
-Сообщение + дополнительные поля = IEventHandle. Именно хендлами акторсистема и оперирует. <event-type>::TPtr – в примере выше – это и есть указатель на типизированный хендл.
-
-Технически типом сообщения может быть любое число, которое получатель и отправитель договорились понимать как идентификатор сообщения. Сложившаяся практика – выделять диапазон идентификаторов макросом EventSpaceBegin (фактически блоками по 64к), начиная с блока ES_USERSPACE.
-Кука – неинтерпретируемое ui64 число, передаваемое с хендлом. Хорошей практикой является в ответе сервиса на сообщение выставлять куку в куку исходного сообщения, особенно для сервисов, потенциально используемых конкурентно.
-
-В флагах несколько бит зарезервировано под флаги, декларирующие как необходимо обрабатывать особые ситуации и 12 бит – под номер канала интерконнекта, в котором будет пересылаться сообщение (для локальных сообщений в имеющихся реализациях номер канала не имеет значения - хотя можно представить реализацию где для каналов будут независимые очереди).
-
-### Тредпулы и мейлбоксы.
-В рамках одной акторсистемы может сосуществовать несколько независимых тредпулов, каждый актор регистрируется на конкретном и в процессе жизни не может мигрировать (но может создавать новые акторы на произвольном тредпуле). Используется для крупноблочного разделения ресурсов, либо между разными активностями (вот здесь – обрабатываем один класс запросов, а вот здесь - другой), либо между разными профилями активности (вот здесь обрабатываем быстрые запросы, здесь – медленные, а вот там – вообще батчёвые). Например в YDB работает системный тредпул (в котором запускаются акторы, необходимые для функционирования YDB, и для которого мы следим что бы не было длительной блокировки в обработчиках), пользовательский тредпул (в котором обрабатываются запросы и потенциально обработчики могут уходить в себя подольше, но это не повлияет на инфраструктуру), батчёвый тредпул (куда отгружается длительная обработка – компакшены дисков, сканы таблиц и подобное) и, в жирных нодах – тредпул интерконнекта (как наиболее чувствительного к задержкам).
-Пересылка сообщений между акторами разных тредпулов но одной локальной акторсистемы остаётся локальной, принудительной сериализации сообщения не происходит.
-
+## Actor library
+
+### Часть первая, вводная.
+Иногда приходится разрабатывать асинхронные, существенно параллельные, местами распределённые программы. Иногда еще и внутренняя логика нетривиальна, разнородна, пишется разными командами не один год. Всё как мы любим. Человечеством придумано не так много способов внутренней организации структуры и кода таких программ. Большинство из них плохие (и именно из-за плохих подходов разработка асинхронных, многопоточных программ приобрела дурную славу). Некоторые получше. А серебряной пули как обычно нет.
+
+Когда мы начинали разработку Yandex Database (тогда еще KiKiMR), сразу было понятно что простыми наколеночными поделиями обойтись (и сделать при этом хорошо, так что бы не было стыдно) не получится. В качестве базиса мы выбрали мессадж-пассинг и модель акторов. И не пожалели. Постепенно этот подход распространился на смежные проекты.
+
+### Базовые концепции.
+Если отбросить шелуху – представляем сервис (программу в случае запуска изолированного бинарника) как ансамбль независимых агентов, взаимодействующих через отправку асинхронных сообщений внутри общего окружения. Тут все слова важны:
+
+Независимых – не разделяют состояние и поток выполнения.
+Передача сообщений – формализуем протоколы, а не интерфейсы.
+
+Асинхронная – не блокируемся на отправке сообщений.
+Общее окружение – все агенты разделяют общий пул ресурсов и каждый из них, зная адрес, может послать сообщение каждому.
+
+В более хайповых терминах – очень похоже на колокейтед микросервисы, только уровнем ниже. И да, мы заведомо не хотели прятать асинхронщину и параллелизм от разработчика, показывая прям самое мясо.
+
+### IActor.
+https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/core/actor.h?rev=5315854#L105
+Базовый класс всех агентов, напрямую обычно не используется. Инстанцируется либо TActor, либо TActorBootstrapped. Фактически весь полезный код программы размещается в акторах.
+(важное замечание – в коде увидите ручки с TActorContext и без него, схожие по названию и назначению. На данный момент вариант с TActorContext является устаревшим, новый код стоит писать без его использования).
+Важные методы:
+
+PassAway – единственный корректный способ зарегистрированному актору умереть. Может вызываться только находясь внутри обработчика сообщения.
+Send – отправка сообщения, зная адрес получателя. В акторе доступен хелпер, принимающий непосредственно сообщение. Базовый вызов, принимающий полный event handle – доступен в контексте.
+
+Become – установить функцию-обработчик сообщений, которая будет использована при получении следующего сообщения.
+
+Register – зарегистрировать новый актор в акторсистеме, с выделением нового мейлбокса. Важно – с момента вызова владение актором передается акторсистеме, т.е. уже к моменту выхода актор может начать выполняться на другом потоке, нельзя к нему ни обращаться прямыми вызовами, ни даже предполагать что он еще жив.
+
+Schedule – зарегистрировать сообщение, которое будет отправлено не менее чем через запрошенную задержку. В акторе доступен хелпер, декорирующий сообщение хендлом отправки самому себе, в контексте можно передать полный хендл.
+
+SelfId – узнать собственный адрес. Возвращаемый объект TActorIdentity можно передавать если требуется делегировать отправку сообщений от имени актора (например если пишете полезный код пользуясь пассивными объектами).
+Посылка сообщений дешёвая, не нужно на ней чрезмерно экономить (но не бесплатная – поэтому посылать сообщения только ради посылки сообщений то же не стоит).
+
+Инстанцирование акторов так же дёшево, актор на запрос или фазу запроса – вполне нормальная практика. Мультиплексировать обработку разных запросов в одном акторе – так же вполне нормально. В нашем коде много примеров и первого, и второго. Пользуйтесь здравым смыслов и собственным вкусом.
+Т.к. на время обработки сообщения актор занимает тред из пула акторсистемы – уходить в длинные вычисления лучше на отдельном отселённом акторе (и либо отселять в отдельный пол акторсистемы, либо контролировать параллельность брокером ресурсов), блокирующие вызовы делать почти всегда ошибка. Стремление написать мютекс - ересь и от лукавого.
+Идентифицируются акторы своим TActorID-ом, который уникален и вы не должны его придумывать из воздуха, а только получить из регистрации (для порождённых акторов) или его вам должен рассказать кто-то, законно его знающий.
+
+Отправка на несуществующий актор (уже умерший) безопасна, сообщение будет просто выброшено в момент обработки (как обрабатывать недоставку сообщений в протоколах расскажу ниже).
+
+Кроме нормальных TActorID существуют еще и сервисные (составленные из строчки и номера ноды). Под ними может быть зарегистрирован реальный актор и фактически при получении сообщения по сервисному адресу – попробует переправить его текущему фактическому. Это позволяет размещать хорошо известные сервисы по хорошо известному адресу, не выстраивая параллельную машинерию поиска.
+
+Строить из актора конечный автомат при помощи переключений функции-обработчика – выбор в каждом конкретном случае, иногда удобнее да, иногда сваливать всё в одно состояние, а иногда – применять гибридное решение (когда часть жизненного цикла – обычно инициализации и завершение – выражены в переходах, а часть – нет).
+Меньше слов, больше дела – этого уже достаточно что бы прочитать самый простой пример. https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/01_ping_pong
+Здесь можно увидеть образец самого простого актора, занимающегося переброской сообщений и использующего все основные вызовы. Заодно покрутив за разные ручки (количество тредов в тредпуле, количество пар перебрасывающихся акторов) можно посмотреть на изменение поведения системы (hint: в таких простых сценариях максимум перфоманса достигается при одном треде в тредпулах).
+
+### Event и Event Handle.
+Полезную нагрузку сообщений заворачиваем в наследника IEventBase, у которого два важных метода – сериализация и загрузка. Сериализация виртуальная, а вот загрузка – нет, и для разбора сообщения из байтовой последовательности – необходимо на стороне получателя сматчить число-идентификатор типа ивента с С++ типом. Именно это делают макросы из hfunc.h. На практике ивенты создаются либо как наследник TEventLocal<> (для строго локальных сообщений) либо как наследник TEventPB<> (для потенциально пересылаемых по сети сообщений, типизируются protobuf-мессаджем).
+
+Кроме непосредственно ивента (в виде структуры либо в виде байтовой строки) для пересылки сообщения необходим набор дополнительных полей
+
+Адресат
+
+Отправитель
+
+Тип сообщения
+
+Кука
+
+Флаги
+
+Сообщение + дополнительные поля = IEventHandle. Именно хендлами акторсистема и оперирует. <event-type>::TPtr – в примере выше – это и есть указатель на типизированный хендл.
+
+Технически типом сообщения может быть любое число, которое получатель и отправитель договорились понимать как идентификатор сообщения. Сложившаяся практика – выделять диапазон идентификаторов макросом EventSpaceBegin (фактически блоками по 64к), начиная с блока ES_USERSPACE.
+Кука – неинтерпретируемое ui64 число, передаваемое с хендлом. Хорошей практикой является в ответе сервиса на сообщение выставлять куку в куку исходного сообщения, особенно для сервисов, потенциально используемых конкурентно.
+
+В флагах несколько бит зарезервировано под флаги, декларирующие как необходимо обрабатывать особые ситуации и 12 бит – под номер канала интерконнекта, в котором будет пересылаться сообщение (для локальных сообщений в имеющихся реализациях номер канала не имеет значения - хотя можно представить реализацию где для каналов будут независимые очереди).
+
+### Тредпулы и мейлбоксы.
+В рамках одной акторсистемы может сосуществовать несколько независимых тредпулов, каждый актор регистрируется на конкретном и в процессе жизни не может мигрировать (но может создавать новые акторы на произвольном тредпуле). Используется для крупноблочного разделения ресурсов, либо между разными активностями (вот здесь – обрабатываем один класс запросов, а вот здесь - другой), либо между разными профилями активности (вот здесь обрабатываем быстрые запросы, здесь – медленные, а вот там – вообще батчёвые). Например в YDB работает системный тредпул (в котором запускаются акторы, необходимые для функционирования YDB, и для которого мы следим что бы не было длительной блокировки в обработчиках), пользовательский тредпул (в котором обрабатываются запросы и потенциально обработчики могут уходить в себя подольше, но это не повлияет на инфраструктуру), батчёвый тредпул (куда отгружается длительная обработка – компакшены дисков, сканы таблиц и подобное) и, в жирных нодах – тредпул интерконнекта (как наиболее чувствительного к задержкам).
+Пересылка сообщений между акторами разных тредпулов но одной локальной акторсистемы остаётся локальной, принудительной сериализации сообщения не происходит.
+
При регистрации актор прикрепляется к мейлбоксу (в типичном случае на собственном мейлбоксе, но по особой нужде можно находясь внутри обработки сообщения прикрепить порождённый актор к текущему активному мейлбоксу – см. RegisterWithSameMailbox (ранее RegisterLocal) – в этом случае будет гарантироваться отсутствие конкурентной обработки сообщений). Собственно Send – это и есть заворачивание ивента в хендл, помещение хендла в очередь мейлбокса и добавление мейлбокса в очередь активации тредпула. В рамках одного мейлбокса – обработка FIFO, между мейлбоксами таких гарантий нет, хотя и стараемся активировать мейлбоксы примерно в порядке появления в них сообщений.
-
-При регистрации актора можно выбрать тип мейлбокса, они немного отличаются стоимость добавления – либо дёшево, но похуже под контеншеном, либо почти wait-free, но подороже. См. комментарии к TMailboxType за актуальными подсказками что-как.
-
-Полезные хелперы.
-
-STFUNC – декларация стейт-функции, рекомендую всегда использовать именно такую форму для декларации, т.к. потом проще искать.
-
-hFunc – макрос выбора хендлера, передающий ивент в обработчик.
-
-cFunc – макрос выбора хендлера, не передающий ивент в обработчик.
-
-### Обработка сбоев.
-В рамках локальной акторсистемы доставка сообщений гарантирована, если по какой-то причине сообщение не доставлено (важно! Именно не доставлено, факт обработанности сообщения уже на совести принимающего актора) – то произойдёт одно из:
-
-Если выставлен флаг FlagForwardOnNondelivery – сообщение будет переправлено на актор, переданный как forwardOnNondelivery при конструировании хендла. Полезно например если какие-то сервисы создаются по требованию и для несозданных сервисов – желаем зароутить в роутер. Работает только в рамках локальной акторсистемы.
-
-Иначе при выставленном флаге FlagTrackDelivery – для отправителя будет сгенерирован ивент TEvUndelivered от имени недоступного актора. Получение такого сообщения гарантирует что исходный ивент не был обработан и никакие эффекты не произошли. Генерация и доставка нотификации в рамках локальной акторсистемы гарантирована, в распределённой – как повезёт, может и потеряться.
-
-Иначе, если никакие флаги не выставлены – сообщение будет выброшено.
-
-Т.к. в распределённой системе доставка нотификаций о недоставке не гарантируется, то для надёжной обработки сбоев необходим дополнительный механизм – по флагу FlagSubscribeOnSession при пересечении границ ноды происходит подписка отправителя на нотификацию о разрыве сетевой сессии, в рамках которой сообщение было отправлено. Теперь при разрыве сетевой сессии отправитель узнает что сообщение могло быть недоставлено (а могло и быть доставлено – мы не знаем) и сможет отреагировать. Нужно не забывать отписываться от нотификации о разрыве сессий – иначе будут копиться вплоть до ближайшего разрыва (который может и не скоро произойти).
-
-Резюмируя: при необходимости контролировать доставку внутри локальной акторсистемы – выставляем флаг FlagTrackDelivery и обрабатываем TEvUndelivered. Для распределённой – добавляем FlagSubscribeOnSession и дополнительно обрабатываем TEvNodeDisconnected не забывая отписываться от более не нужных подписок.
-
-### Интерконнект.
-Локальная акторсистема – это только половина пирога, возможность объединить их в распределённую – вторая половина. Реализация интерконнекта доступна из коробки и умеет
-Передавать сообщения через одно tcp-соединение
-Мультиплексировать разные потоки (ака каналы) в рамках одного соединения, гарантируя порядок в рамках канала
-Старается делать это хорошо.
-В рамках распределённой системы требуется каждой локальной акторсистеме назначить уникальный номер (например табличкой или реализовав динамическую раздачу номеров ноды) и запустить в рамках каждой локальной акторсистемы локальный неймсервис (например по табличке ремапинга номера ноды в сетевой адрес либо как кеш опорного неймсервиса).
-Смотрим на второй пример https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/02_discovery
-Тут у нас конфигурируется распределённая акторсистема (в примере все пять запускаются в одном бинарнике, но точно так же – можно запускать и частями) на пять нод. На каждой ноде запускается реплика для паблишинга строчек и актор-эндпоинт (каждый со своим портом). Эндпоинты с помощью актора-паблишера публикуют свои явки/пароли на распределённый сторадж (с обработкой нештатных ситауций и поддержанием в актуальном состоянии). И рядом лежит реализация запроса к стораджу на листинг опубликованого по мажорити. Собственно это упрощённый и почищенный от специфики код, используемый в YDB для публикации и нахождения актуальных эндпоинтов пользовательской базы.
+
+При регистрации актора можно выбрать тип мейлбокса, они немного отличаются стоимость добавления – либо дёшево, но похуже под контеншеном, либо почти wait-free, но подороже. См. комментарии к TMailboxType за актуальными подсказками что-как.
+
+Полезные хелперы.
+
+STFUNC – декларация стейт-функции, рекомендую всегда использовать именно такую форму для декларации, т.к. потом проще искать.
+
+hFunc – макрос выбора хендлера, передающий ивент в обработчик.
+
+cFunc – макрос выбора хендлера, не передающий ивент в обработчик.
+
+### Обработка сбоев.
+В рамках локальной акторсистемы доставка сообщений гарантирована, если по какой-то причине сообщение не доставлено (важно! Именно не доставлено, факт обработанности сообщения уже на совести принимающего актора) – то произойдёт одно из:
+
+Если выставлен флаг FlagForwardOnNondelivery – сообщение будет переправлено на актор, переданный как forwardOnNondelivery при конструировании хендла. Полезно например если какие-то сервисы создаются по требованию и для несозданных сервисов – желаем зароутить в роутер. Работает только в рамках локальной акторсистемы.
+
+Иначе при выставленном флаге FlagTrackDelivery – для отправителя будет сгенерирован ивент TEvUndelivered от имени недоступного актора. Получение такого сообщения гарантирует что исходный ивент не был обработан и никакие эффекты не произошли. Генерация и доставка нотификации в рамках локальной акторсистемы гарантирована, в распределённой – как повезёт, может и потеряться.
+
+Иначе, если никакие флаги не выставлены – сообщение будет выброшено.
+
+Т.к. в распределённой системе доставка нотификаций о недоставке не гарантируется, то для надёжной обработки сбоев необходим дополнительный механизм – по флагу FlagSubscribeOnSession при пересечении границ ноды происходит подписка отправителя на нотификацию о разрыве сетевой сессии, в рамках которой сообщение было отправлено. Теперь при разрыве сетевой сессии отправитель узнает что сообщение могло быть недоставлено (а могло и быть доставлено – мы не знаем) и сможет отреагировать. Нужно не забывать отписываться от нотификации о разрыве сессий – иначе будут копиться вплоть до ближайшего разрыва (который может и не скоро произойти).
+
+Резюмируя: при необходимости контролировать доставку внутри локальной акторсистемы – выставляем флаг FlagTrackDelivery и обрабатываем TEvUndelivered. Для распределённой – добавляем FlagSubscribeOnSession и дополнительно обрабатываем TEvNodeDisconnected не забывая отписываться от более не нужных подписок.
+
+### Интерконнект.
+Локальная акторсистема – это только половина пирога, возможность объединить их в распределённую – вторая половина. Реализация интерконнекта доступна из коробки и умеет
+Передавать сообщения через одно tcp-соединение
+Мультиплексировать разные потоки (ака каналы) в рамках одного соединения, гарантируя порядок в рамках канала
+Старается делать это хорошо.
+В рамках распределённой системы требуется каждой локальной акторсистеме назначить уникальный номер (например табличкой или реализовав динамическую раздачу номеров ноды) и запустить в рамках каждой локальной акторсистемы локальный неймсервис (например по табличке ремапинга номера ноды в сетевой адрес либо как кеш опорного неймсервиса).
+Смотрим на второй пример https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/02_discovery
+Тут у нас конфигурируется распределённая акторсистема (в примере все пять запускаются в одном бинарнике, но точно так же – можно запускать и частями) на пять нод. На каждой ноде запускается реплика для паблишинга строчек и актор-эндпоинт (каждый со своим портом). Эндпоинты с помощью актора-паблишера публикуют свои явки/пароли на распределённый сторадж (с обработкой нештатных ситауций и поддержанием в актуальном состоянии). И рядом лежит реализация запроса к стораджу на листинг опубликованого по мажорити. Собственно это упрощённый и почищенный от специфики код, используемый в YDB для публикации и нахождения актуальных эндпоинтов пользовательской базы.
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index 3fe085439a..c58698a206 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -1,6 +1,6 @@
#include "defs.h"
#include "actorsystem.h"
-#include "callstack.h"
+#include "callstack.h"
#include "cpu_manager.h"
#include "mailbox.h"
#include "events.h"
@@ -64,10 +64,10 @@ namespace NActors {
if (Y_UNLIKELY(!ev))
return false;
-#ifdef USE_ACTOR_CALLSTACK
+#ifdef USE_ACTOR_CALLSTACK
ev->Callstack.TraceIfEmpty();
-#endif
-
+#endif
+
TActorId recipient = ev->GetRecipientRewrite();
const ui32 recpNodeId = recipient.NodeId();
diff --git a/library/cpp/actors/core/callstack.cpp b/library/cpp/actors/core/callstack.cpp
index d1536d84f4..9297c1a079 100644
--- a/library/cpp/actors/core/callstack.cpp
+++ b/library/cpp/actors/core/callstack.cpp
@@ -1,14 +1,14 @@
-#include "callstack.h"
-#include <util/thread/singleton.h>
-
-#ifdef USE_ACTOR_CALLSTACK
-
-namespace NActors {
+#include "callstack.h"
+#include <util/thread/singleton.h>
+
+#ifdef USE_ACTOR_CALLSTACK
+
+namespace NActors {
namespace {
void (*PreviousFormatBackTrace)(IOutputStream*) = 0;
ui32 ActorBackTraceEnableCounter = 0;
}
-
+
void ActorFormatBackTrace(IOutputStream* out) {
TStringStream str;
PreviousFormatBackTrace(&str);
@@ -16,19 +16,19 @@ namespace NActors {
TCallstack::DumpCallstack(str);
*out << str.Str();
}
-
+
void EnableActorCallstack() {
if (ActorBackTraceEnableCounter == 0) {
Y_VERIFY(PreviousFormatBackTrace == 0);
PreviousFormatBackTrace = SetFormatBackTraceFn(ActorFormatBackTrace);
}
-
+
++ActorBackTraceEnableCounter;
}
void DisableActorCallstack() {
--ActorBackTraceEnableCounter;
-
+
if (ActorBackTraceEnableCounter == 0) {
Y_VERIFY(PreviousFormatBackTrace);
SetFormatBackTraceFn(PreviousFormatBackTrace);
@@ -42,12 +42,12 @@ namespace NActors {
, LinesToSkip(0)
{
}
-
+
void TCallstack::SetLinesToSkip() {
TTrace record;
LinesToSkip = BackTrace(record.Data, TTrace::CAPACITY);
}
-
+
void TCallstack::Trace() {
size_t currentIdx = (BeginIdx + Size) % RECORDS;
if (Size == RECORDS) {
@@ -59,18 +59,18 @@ namespace NActors {
record.Size = BackTrace(record.Data, TTrace::CAPACITY);
record.LinesToSkip = LinesToSkip;
}
-
+
void TCallstack::TraceIfEmpty() {
if (Size == 0) {
LinesToSkip = 0;
Trace();
}
- }
-
+ }
+
TCallstack& TCallstack::GetTlsCallstack() {
return *FastTlsSingleton<TCallstack>();
- }
-
+ }
+
void TCallstack::DumpCallstack(TStringStream& str) {
TCallstack& callstack = GetTlsCallstack();
for (int i = callstack.Size - 1; i >= 0; --i) {
@@ -86,8 +86,8 @@ namespace NActors {
FormatBackTrace(&str, record.Data, size);
}
str << Endl;
- }
- }
+ }
+ }
}
-
-#endif
+
+#endif
diff --git a/library/cpp/actors/core/callstack.h b/library/cpp/actors/core/callstack.h
index 265e9b63b8..176717d2ae 100644
--- a/library/cpp/actors/core/callstack.h
+++ b/library/cpp/actors/core/callstack.h
@@ -1,24 +1,24 @@
-#pragma once
-
-#ifndef NDEBUG
-//#define ENABLE_ACTOR_CALLSTACK
-#endif
-
-#ifdef ENABLE_ACTOR_CALLSTACK
+#pragma once
+
+#ifndef NDEBUG
+//#define ENABLE_ACTOR_CALLSTACK
+#endif
+
+#ifdef ENABLE_ACTOR_CALLSTACK
#include "defs.h"
#include <util/system/backtrace.h>
#include <util/stream/str.h>
#include <util/generic/deque.h>
#define USE_ACTOR_CALLSTACK
-
-namespace NActors {
+
+namespace NActors {
struct TCallstack {
struct TTrace {
static const size_t CAPACITY = 50;
void* Data[CAPACITY];
size_t Size;
size_t LinesToSkip;
-
+
TTrace()
: Size(0)
, LinesToSkip(0)
@@ -30,29 +30,29 @@ namespace NActors {
static const size_t RECORDS_TO_SKIP = 2;
TTrace Record[RECORDS];
size_t BeginIdx;
- size_t Size;
- size_t LinesToSkip;
-
+ size_t Size;
+ size_t LinesToSkip;
+
TCallstack();
void SetLinesToSkip();
void Trace();
void TraceIfEmpty();
static TCallstack& GetTlsCallstack();
static void DumpCallstack(TStringStream& str);
- };
-
+ };
+
void EnableActorCallstack();
void DisableActorCallstack();
-
+
}
-
-#else
-
-namespace NActors {
+
+#else
+
+namespace NActors {
inline void EnableActorCallstack(){};
-
+
inline void DisableActorCallstack(){};
-
+
}
-
-#endif
+
+#endif
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 89d1e2a969..6ff02aaf94 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -2,7 +2,7 @@
#include "defs.h"
#include "actorid.h"
-#include "callstack.h"
+#include "callstack.h"
#include "event_load.h"
#include <library/cpp/actors/wilson/wilson_trace.h>
@@ -117,9 +117,9 @@ namespace NActors {
static const size_t ChannelBits = 12;
static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits;
-#ifdef USE_ACTOR_CALLSTACK
+#ifdef USE_ACTOR_CALLSTACK
TCallstack Callstack;
-#endif
+#endif
ui16 GetChannel() const noexcept {
return Flags >> ChannelShift;
}
@@ -133,7 +133,7 @@ namespace NActors {
Y_VERIFY(flags < (1 << ChannelShift));
return (flags | (channel << ChannelShift));
}
-
+
private:
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
@@ -165,7 +165,7 @@ namespace NActors {
TActorId GetForwardOnNondeliveryRecipient() const {
return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId();
}
-
+
IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0,
const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {})
: Type(ev->Type())
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index d48411317b..446b651efd 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -1,6 +1,6 @@
#include "executor_thread.h"
#include "actorsystem.h"
-#include "callstack.h"
+#include "callstack.h"
#include "mailbox.h"
#include "event.h"
#include "events.h"
@@ -148,10 +148,10 @@ namespace NActors {
TActorContext ctx(*mailbox, *this, hpprev, recipient);
TlsActivationContext = &ctx;
-#ifdef USE_ACTOR_CALLSTACK
+#ifdef USE_ACTOR_CALLSTACK
TCallstack::GetTlsCallstack() = ev->Callstack;
TCallstack::GetTlsCallstack().SetLinesToSkip();
-#endif
+#endif
CurrentRecipient = recipient;
CurrentActorScheduledEventsCounter = 0;
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index c58698ee1f..9d3c573f0d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -4,7 +4,7 @@
#include "event.h"
#include "actor.h"
#include "actorsystem.h"
-#include "callstack.h"
+#include "callstack.h"
#include "probes.h"
#include "worker_context.h"
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index a3522988b3..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -1,5 +1,5 @@
#include "log.h"
-#include "log_settings.h"
+#include "log_settings.h"
#include <library/cpp/monlib/service/pages/templates.h>
@@ -211,7 +211,7 @@ namespace NActors {
void TLoggerActor::Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...) {
Metrics->IncDirectMsgs();
- if (Settings && Settings->Satisfies(priority, component, 0ull)) {
+ if (Settings && Settings->Satisfies(priority, component, 0ull)) {
va_list params;
va_start(params, c);
TString formatted;
@@ -233,14 +233,14 @@ namespace NActors {
if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) {
BecomeDefunct();
}
- }
-
+ }
+
void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ev);
LogIgnoredCount(ctx.Now());
- IgnoredCount = 0;
- PassedCount = 0;
- }
+ IgnoredCount = 0;
+ PassedCount = 0;
+ }
void TLoggerActor::HandleIgnoredEventDrop() {
// logger backend is unavailable, just ignore
@@ -248,7 +248,7 @@ namespace NActors {
void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) {
Metrics->IncActorMsgs();
-
+
const auto prio = ev.Level.ToPrio();
switch (prio) {
@@ -281,7 +281,7 @@ namespace NActors {
++IgnoredCount;
PassedCount = 0;
return;
- }
+ }
PassedCount++;
} else {
// Enable of disable throttling depending on the load
@@ -289,8 +289,8 @@ namespace NActors {
AtomicSet(IsOverflow, 1);
else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow))
AtomicSet(IsOverflow, 0);
- }
-
+ }
+
const auto prio = ev->Get()->Level.ToPrio();
if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) {
BecomeDefunct();
@@ -337,7 +337,7 @@ namespace NActors {
auto name = Settings->ComponentName(i);
if (!*name)
continue;
- NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i);
+ NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i);
TABLER() {
TABLED() {
@@ -410,8 +410,8 @@ namespace NActors {
TStringStream str;
if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) {
- NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component);
- ui32 samplingRate = componentSettings.Raw.X.SamplingRate;
+ NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component);
+ ui32 samplingRate = componentSettings.Raw.X.SamplingRate;
HTML(str) {
DIV_CLASS("row") {
DIV_CLASS("col-md-12") {
@@ -682,17 +682,17 @@ namespace NActors {
OutputDebugString(x.c_str());
}
#endif
- bool isOk = false;
- do {
- try {
+ bool isOk = false;
+ do {
+ try {
TRecordWithNewline r(rec);
Cerr.Write(r.Buf.Data(), r.Buf.Filled());
- isOk = true;
- } catch (TSystemError err) {
- // Interrupted system call
+ isOk = true;
+ } catch (TSystemError err) {
+ // Interrupted system call
Y_UNUSED(err);
- }
- } while (!isOk);
+ }
+ } while (!isOk);
}
void ReopenLog() override {
@@ -743,10 +743,10 @@ namespace NActors {
return new TLineFileLogBackend(fileName);
}
- TAutoPtr<TLogBackend> CreateNullBackend() {
- return new TNullLogBackend();
- }
-
+ TAutoPtr<TLogBackend> CreateNullBackend() {
+ return new TNullLogBackend();
+ }
+
TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) {
return new TCompositeLogBackend(std::move(underlyingBackends));
}
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index cbcbcd8786..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -12,7 +12,7 @@
#include <util/generic/vector.h>
#include <util/string/printf.h>
-#include <util/string/builder.h>
+#include <util/string/builder.h>
#include <library/cpp/logger/all.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
@@ -24,14 +24,14 @@
// TODO: limit number of messages per second
// TODO: make TLogComponentLevelRequest/Response network messages
-#define IS_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component) \
- (static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()) && \
- static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings())->Satisfies( \
- static_cast<::NActors::NLog::EPriority>(priority), \
- static_cast<::NActors::NLog::EComponent>(component), \
- 0ull) \
- )
-
+#define IS_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component) \
+ (static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()) && \
+ static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings())->Satisfies( \
+ static_cast<::NActors::NLog::EPriority>(priority), \
+ static_cast<::NActors::NLog::EComponent>(component), \
+ 0ull) \
+ )
+
#define LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, ...) \
do { \
::NActors::NLog::TSettings* mSettings = static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()); \
@@ -49,9 +49,9 @@
logStringBuilder << stream; \
return static_cast<TString>(logStringBuilder); \
}().data())
-
-#define LOG_LOG(actorCtxOrSystem, priority, component, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, __VA_ARGS__)
-#define LOG_LOG_S(actorCtxOrSystem, priority, component, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, stream)
+
+#define LOG_LOG(actorCtxOrSystem, priority, component, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, __VA_ARGS__)
+#define LOG_LOG_S(actorCtxOrSystem, priority, component, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, stream)
// use these macros for logging via actor system or actor context
#define LOG_EMERG(actorCtxOrSystem, component, ...) LOG_LOG(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, __VA_ARGS__)
@@ -69,11 +69,11 @@
#define LOG_CRIT_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, stream)
#define LOG_ERROR_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_ERROR, component, stream)
#define LOG_WARN_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_WARN, component, stream)
-#define LOG_NOTICE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_NOTICE, component, stream)
+#define LOG_NOTICE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_NOTICE, component, stream)
#define LOG_INFO_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_INFO, component, stream)
#define LOG_DEBUG_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_DEBUG, component, stream)
#define LOG_TRACE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_TRACE, component, stream)
-
+
#define LOG_EMERG_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, sampleBy, __VA_ARGS__)
#define LOG_ALERT_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_ALERT, component, sampleBy, __VA_ARGS__)
#define LOG_CRIT_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, sampleBy, __VA_ARGS__)
@@ -83,7 +83,7 @@
#define LOG_INFO_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_INFO, component, sampleBy, __VA_ARGS__)
#define LOG_DEBUG_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_DEBUG, component, sampleBy, __VA_ARGS__)
#define LOG_TRACE_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_TRACE, component, sampleBy, __VA_ARGS__)
-
+
#define LOG_EMERG_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, sampleBy, stream)
#define LOG_ALERT_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_ALERT, component, sampleBy, stream)
#define LOG_CRIT_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, sampleBy, stream)
@@ -167,11 +167,11 @@ namespace NActors {
};
class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> {
- public:
- TLogIgnored() {
- }
- };
-
+ public:
+ TLogIgnored() {
+ }
+ };
+
////////////////////////////////////////////////////////////////////////////////
// LOGGER ACTOR
////////////////////////////////////////////////////////////////////////////////
@@ -212,7 +212,7 @@ namespace NActors {
void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
- HFunc(TLogIgnored, HandleIgnoredEvent);
+ HFunc(TLogIgnored, HandleIgnoredEvent);
HFunc(NLog::TEvLog, HandleLogEvent);
HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest);
HFunc(NMon::TEvHttpInfo, HandleMonInfo);
@@ -298,7 +298,7 @@ namespace NActors {
bool logPError, bool logCons);
TAutoPtr<TLogBackend> CreateStderrBackend();
TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName);
- TAutoPtr<TLogBackend> CreateNullBackend();
+ TAutoPtr<TLogBackend> CreateNullBackend();
TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends);
/////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index 88233dddf9..f52f2fc5d2 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -9,7 +9,7 @@ namespace NActors {
EPriority defPriority, EPriority defSamplingPriority,
ui32 defSamplingRate, ui64 timeThresholdMs)
: LoggerActorId(loggerActorId)
- , LoggerComponent(loggerComponent)
+ , LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
, AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
@@ -31,7 +31,7 @@ namespace NActors {
EPriority defPriority, EPriority defSamplingPriority,
ui32 defSamplingRate, ui64 timeThresholdMs)
: LoggerActorId(loggerActorId)
- , LoggerComponent(loggerComponent)
+ , LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
, AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
@@ -102,16 +102,16 @@ namespace NActors {
}
if (component == InvalidComponent) {
- for (int i = 0; i < Mask + 1; i++) {
- TComponentSettings settings = AtomicGet(ComponentInfo[i]);
- if (isSampling) {
- settings.Raw.X.SamplingLevel = priority;
- } else {
- settings.Raw.X.Level = priority;
- }
- AtomicSet(ComponentInfo[i], settings.Raw.Data);
- }
-
+ for (int i = 0; i < Mask + 1; i++) {
+ TComponentSettings settings = AtomicGet(ComponentInfo[i]);
+ if (isSampling) {
+ settings.Raw.X.SamplingLevel = priority;
+ } else {
+ settings.Raw.X.Level = priority;
+ }
+ AtomicSet(ComponentInfo[i], settings.Raw.Data);
+ }
+
TStringStream str;
str << titleName
@@ -124,16 +124,16 @@ namespace NActors {
explanation = "Invalid component";
return 1;
}
- TComponentSettings settings = AtomicGet(ComponentInfo[component]);
- EPriority oldPriority;
- if (isSampling) {
- oldPriority = (EPriority)settings.Raw.X.SamplingLevel;
- settings.Raw.X.SamplingLevel = priority;
- } else {
- oldPriority = (EPriority)settings.Raw.X.Level;
- settings.Raw.X.Level = priority;
- }
- AtomicSet(ComponentInfo[component], settings.Raw.Data);
+ TComponentSettings settings = AtomicGet(ComponentInfo[component]);
+ EPriority oldPriority;
+ if (isSampling) {
+ oldPriority = (EPriority)settings.Raw.X.SamplingLevel;
+ settings.Raw.X.SamplingLevel = priority;
+ } else {
+ oldPriority = (EPriority)settings.Raw.X.Level;
+ settings.Raw.X.Level = priority;
+ }
+ AtomicSet(ComponentInfo[component], settings.Raw.Data);
TStringStream str;
str << titleName << " for the component " << ComponentNames[component]
<< " has been changed from " << PriorityToString(EPrio(oldPriority))
@@ -144,21 +144,21 @@ namespace NActors {
}
int TSettings::SetLevel(EPriority priority, EComponent component, TString& explanation) {
- return SetLevelImpl("priority", false,
+ return SetLevelImpl("priority", false,
priority, component, explanation);
}
int TSettings::SetSamplingLevel(EPriority priority, EComponent component, TString& explanation) {
- return SetLevelImpl("sampling priority", true,
+ return SetLevelImpl("sampling priority", true,
priority, component, explanation);
}
int TSettings::SetSamplingRate(ui32 sampling, EComponent component, TString& explanation) {
if (component == InvalidComponent) {
for (int i = 0; i < Mask + 1; i++) {
- TComponentSettings settings = AtomicGet(ComponentInfo[i]);
- settings.Raw.X.SamplingRate = sampling;
- AtomicSet(ComponentInfo[i], settings.Raw.Data);
+ TComponentSettings settings = AtomicGet(ComponentInfo[i]);
+ settings.Raw.X.SamplingRate = sampling;
+ AtomicSet(ComponentInfo[i], settings.Raw.Data);
}
TStringStream str;
str << "Sampling rate for all components has been changed to " << sampling;
@@ -168,10 +168,10 @@ namespace NActors {
explanation = "Invalid component";
return 1;
}
- TComponentSettings settings = AtomicGet(ComponentInfo[component]);
- ui32 oldSampling = settings.Raw.X.SamplingRate;
- settings.Raw.X.SamplingRate = sampling;
- AtomicSet(ComponentInfo[component], settings.Raw.Data);
+ TComponentSettings settings = AtomicGet(ComponentInfo[component]);
+ ui32 oldSampling = settings.Raw.X.SamplingRate;
+ settings.Raw.X.SamplingRate = sampling;
+ AtomicSet(ComponentInfo[component], settings.Raw.Data);
TStringStream str;
str << "Sampling rate for the component " << ComponentNames[component]
<< " has been changed from " << oldSampling
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 0e8b76cc02..7fe4504edd 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -45,36 +45,36 @@ namespace NActors {
;
// Log settings
- struct TComponentSettings {
- union {
- struct {
- ui32 SamplingRate;
- ui8 SamplingLevel;
- ui8 Level;
- } X;
-
- ui64 Data;
- } Raw;
-
- TComponentSettings(TAtomicBase data) {
- Raw.Data = (ui64)data;
- }
-
- TComponentSettings(ui8 level, ui8 samplingLevel, ui32 samplingRate) {
- Raw.X.Level = level;
- Raw.X.SamplingLevel = samplingLevel;
- Raw.X.SamplingRate = samplingRate;
- }
- };
-
+ struct TComponentSettings {
+ union {
+ struct {
+ ui32 SamplingRate;
+ ui8 SamplingLevel;
+ ui8 Level;
+ } X;
+
+ ui64 Data;
+ } Raw;
+
+ TComponentSettings(TAtomicBase data) {
+ Raw.Data = (ui64)data;
+ }
+
+ TComponentSettings(ui8 level, ui8 samplingLevel, ui32 samplingRate) {
+ Raw.X.Level = level;
+ Raw.X.SamplingLevel = samplingLevel;
+ Raw.X.SamplingRate = samplingRate;
+ }
+ };
+
struct TSettings: public TThrRefBase {
public:
TActorId LoggerActorId;
- EComponent LoggerComponent;
- ui64 TimeThresholdMs;
+ EComponent LoggerComponent;
+ ui64 TimeThresholdMs;
bool AllowDrop;
TDuration ThrottleDelay;
- TArrayHolder<TAtomic> ComponentInfo;
+ TArrayHolder<TAtomic> ComponentInfo;
TVector<TString> ComponentNames;
EComponent MinVal;
EComponent MaxVal;
@@ -120,33 +120,33 @@ namespace NActors {
);
}
- inline bool Satisfies(EPriority priority, EComponent component, ui64 sampleBy = 0) const {
+ inline bool Satisfies(EPriority priority, EComponent component, ui64 sampleBy = 0) const {
// by using Mask we don't get outside of array boundaries
- TComponentSettings settings = GetComponentSettings(component);
- if (priority > settings.Raw.X.Level) {
- if (priority > settings.Raw.X.SamplingLevel) {
- return false; // priority > both levels ==> do not log
- }
- // priority <= sampling level ==> apply sampling
- ui32 samplingRate = settings.Raw.X.SamplingRate;
- if (samplingRate) {
+ TComponentSettings settings = GetComponentSettings(component);
+ if (priority > settings.Raw.X.Level) {
+ if (priority > settings.Raw.X.SamplingLevel) {
+ return false; // priority > both levels ==> do not log
+ }
+ // priority <= sampling level ==> apply sampling
+ ui32 samplingRate = settings.Raw.X.SamplingRate;
+ if (samplingRate) {
ui32 samplingValue = sampleBy ? MurmurHash<ui32>((const char*)&sampleBy, sizeof(sampleBy))
: samplingRate != 1 ? RandomNumber<ui32>() : 0;
- return (samplingValue % samplingRate == 0);
- } else {
- // sampling rate not set ==> do not log
- return false;
- }
- } else {
- // priority <= log level ==> log
- return true;
- }
+ return (samplingValue % samplingRate == 0);
+ } else {
+ // sampling rate not set ==> do not log
+ return false;
+ }
+ } else {
+ // priority <= log level ==> log
+ return true;
+ }
}
- inline TComponentSettings GetComponentSettings(EComponent component) const {
+ inline TComponentSettings GetComponentSettings(EComponent component) const {
Y_VERIFY_DEBUG((component & Mask) == component);
// by using Mask we don't get outside of array boundaries
- return TComponentSettings(AtomicGet(ComponentInfo[component & Mask]));
+ return TComponentSettings(AtomicGet(ComponentInfo[component & Mask]));
}
const char* ComponentName(EComponent component) const {
diff --git a/library/cpp/actors/core/scheduler_cookie.h b/library/cpp/actors/core/scheduler_cookie.h
index 4cfb3b3660..2c20ca67f3 100644
--- a/library/cpp/actors/core/scheduler_cookie.h
+++ b/library/cpp/actors/core/scheduler_cookie.h
@@ -18,7 +18,7 @@ namespace NActors {
static ISchedulerCookie* Make3Way();
};
- class TSchedulerCookieHolder : TNonCopyable {
+ class TSchedulerCookieHolder : TNonCopyable {
ISchedulerCookie* Cookie;
public:
@@ -40,7 +40,7 @@ namespace NActors {
return (Cookie == x.Cookie);
}
- ISchedulerCookie* Get() const {
+ ISchedulerCookie* Get() const {
return Cookie;
}
diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h
index 1ee337498e..3b8fac28f0 100644
--- a/library/cpp/actors/core/scheduler_queue.h
+++ b/library/cpp/actors/core/scheduler_queue.h
@@ -102,7 +102,7 @@ namespace NActors {
class TWriterWithPadding: public TWriter {
private:
ui8 CacheLinePadding[64 - sizeof(TWriter)];
-
+
void UnusedCacheLinePadding() {
Y_UNUSED(CacheLinePadding);
}
diff --git a/library/cpp/actors/core/servicemap.h b/library/cpp/actors/core/servicemap.h
index 91620134b4..d72e50cae5 100644
--- a/library/cpp/actors/core/servicemap.h
+++ b/library/cpp/actors/core/servicemap.h
@@ -107,7 +107,7 @@ namespace NActors {
if (ScanBranch(branch, key, hash / BaseSize, ret))
return ret;
}
-
+
return TValue();
}
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index c19b7f0e41..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -34,8 +34,8 @@ SRCS(
balancer.cpp
buffer.cpp
buffer.h
- callstack.cpp
- callstack.h
+ callstack.cpp
+ callstack.h
config.h
cpu_manager.cpp
cpu_manager.h
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index 48e7625b18..f9bfaf8dc0 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -18,54 +18,54 @@ struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> {
const double TimeStart;
};
-template <class TValueType_>
-struct TAvgOperation {
- struct TValueType {
- ui64 Count = 0;
- TValueType_ Sum = TValueType_();
- };
- using TValueVector = TVector<TValueType>;
-
- static constexpr TValueType InitialValue() {
- return TValueType(); // zero
- }
-
- // Updates value in current bucket and returns window value
- static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
- Y_ASSERT(index < buckets.size());
- buckets[index].Sum += newVal.Sum;
- buckets[index].Count += newVal.Count;
- windowValue.Sum += newVal.Sum;
- windowValue.Count += newVal.Count;
- return windowValue;
- }
-
- static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) {
- Y_ASSERT(!buckets.empty());
- Y_ASSERT(firstElemIndex < buckets.size());
- Y_ASSERT(bucketsToClear <= buckets.size());
-
- const size_t arraySize = buckets.size();
- for (size_t i = 0; i < bucketsToClear; ++i) {
- TValueType& curVal = buckets[firstElemIndex];
- windowValue.Sum -= curVal.Sum;
- windowValue.Count -= curVal.Count;
- curVal = InitialValue();
- firstElemIndex = (firstElemIndex + 1) % arraySize;
- }
- return windowValue;
- }
-
-};
-
+template <class TValueType_>
+struct TAvgOperation {
+ struct TValueType {
+ ui64 Count = 0;
+ TValueType_ Sum = TValueType_();
+ };
+ using TValueVector = TVector<TValueType>;
+
+ static constexpr TValueType InitialValue() {
+ return TValueType(); // zero
+ }
+
+ // Updates value in current bucket and returns window value
+ static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
+ Y_ASSERT(index < buckets.size());
+ buckets[index].Sum += newVal.Sum;
+ buckets[index].Count += newVal.Count;
+ windowValue.Sum += newVal.Sum;
+ windowValue.Count += newVal.Count;
+ return windowValue;
+ }
+
+ static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) {
+ Y_ASSERT(!buckets.empty());
+ Y_ASSERT(firstElemIndex < buckets.size());
+ Y_ASSERT(bucketsToClear <= buckets.size());
+
+ const size_t arraySize = buckets.size();
+ for (size_t i = 0; i < bucketsToClear; ++i) {
+ TValueType& curVal = buckets[firstElemIndex];
+ windowValue.Sum -= curVal.Sum;
+ windowValue.Count -= curVal.Count;
+ curVal = InitialValue();
+ firstElemIndex = (firstElemIndex + 1) % arraySize;
+ }
+ return windowValue;
+ }
+
+};
+
class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> {
private:
const TDuration SendInterval;
const NMonitoring::TDynamicCounters::TCounterPtr Counter;
- const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter;
+ const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter;
NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow;
- NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow;
+ NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow;
THPTimer Timer;
@@ -74,13 +74,13 @@ public:
return SELF_PING_ACTOR;
}
- TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
- const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
+ TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
: SendInterval(sendInterval)
, Counter(counter)
- , CalculationTimeCounter(calculationTimeCounter)
+ , CalculationTimeCounter(calculationTimeCounter)
, SlidingWindow(TDuration::Seconds(15), 100)
- , CalculationSlidingWindow(TDuration::Seconds(15), 100)
+ , CalculationSlidingWindow(TDuration::Seconds(15), 100)
{
}
@@ -99,53 +99,53 @@ public:
}
}
- ui64 MeasureTaskDurationNs() {
- // Prepare worm test data
- // 11 * 11 * 3 * 8 = 2904 bytes, fits in L1 cache
- constexpr ui64 Size = 11;
- // Align the data to reduce random alignment effects
- alignas(64) TStackVec<ui64, Size * Size * 3> data;
- ui64 s = 0;
- NHPTimer::STime beginTime;
- NHPTimer::STime endTime;
- // Prepare the data
- data.resize(Size * Size * 3);
- for (ui64 matrixIdx = 0; matrixIdx < 3; ++matrixIdx) {
- for (ui64 y = 0; y < Size; ++y) {
- for (ui64 x = 0; x < Size; ++x) {
- data[matrixIdx * (Size * Size) + y * Size + x] = y * Size + x;
- }
- }
- }
- // Warm-up the cache
- NHPTimer::GetTime(&beginTime);
- for (ui64 idx = 0; idx < data.size(); ++idx) {
- s += data[idx];
- }
- NHPTimer::GetTime(&endTime);
- s += (ui64)(1000000.0 * NHPTimer::GetSeconds(endTime - beginTime));
-
- // Measure the CPU performance
- // C = A * B with injected dependency to s
- NHPTimer::GetTime(&beginTime);
- for (ui64 y = 0; y < Size; ++y) {
- for (ui64 x = 0; x < Size; ++x) {
- for (ui64 i = 0; i < Size; ++i) {
- s += data[y * Size + i] * data[Size * Size + i * Size + x];
- }
- data[2 * Size * Size + y * Size + x] = s;
- s = 0;
- }
- }
- for (ui64 idx = 0; idx < data.size(); ++idx) {
- s += data[idx];
- }
- NHPTimer::GetTime(&endTime);
- // Prepare the result
- double d = 1000000000.0 * (NHPTimer::GetSeconds(endTime - beginTime) + 0.000000001 * (s & 1));
- return (ui64)d;
- }
-
+ ui64 MeasureTaskDurationNs() {
+ // Prepare worm test data
+ // 11 * 11 * 3 * 8 = 2904 bytes, fits in L1 cache
+ constexpr ui64 Size = 11;
+ // Align the data to reduce random alignment effects
+ alignas(64) TStackVec<ui64, Size * Size * 3> data;
+ ui64 s = 0;
+ NHPTimer::STime beginTime;
+ NHPTimer::STime endTime;
+ // Prepare the data
+ data.resize(Size * Size * 3);
+ for (ui64 matrixIdx = 0; matrixIdx < 3; ++matrixIdx) {
+ for (ui64 y = 0; y < Size; ++y) {
+ for (ui64 x = 0; x < Size; ++x) {
+ data[matrixIdx * (Size * Size) + y * Size + x] = y * Size + x;
+ }
+ }
+ }
+ // Warm-up the cache
+ NHPTimer::GetTime(&beginTime);
+ for (ui64 idx = 0; idx < data.size(); ++idx) {
+ s += data[idx];
+ }
+ NHPTimer::GetTime(&endTime);
+ s += (ui64)(1000000.0 * NHPTimer::GetSeconds(endTime - beginTime));
+
+ // Measure the CPU performance
+ // C = A * B with injected dependency to s
+ NHPTimer::GetTime(&beginTime);
+ for (ui64 y = 0; y < Size; ++y) {
+ for (ui64 x = 0; x < Size; ++x) {
+ for (ui64 i = 0; i < Size; ++i) {
+ s += data[y * Size + i] * data[Size * Size + i * Size + x];
+ }
+ data[2 * Size * Size + y * Size + x] = s;
+ s = 0;
+ }
+ }
+ for (ui64 idx = 0; idx < data.size(); ++idx) {
+ s += data[idx];
+ }
+ NHPTimer::GetTime(&endTime);
+ // Prepare the result
+ double d = 1000000000.0 * (NHPTimer::GetSeconds(endTime - beginTime) + 0.000000001 * (s & 1));
+ return (ui64)d;
+ }
+
void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)
{
const auto now = ctx.Now();
@@ -156,10 +156,10 @@ public:
*Counter = SlidingWindow.Update(delayUs, now);
- ui64 d = MeasureTaskDurationNs();
+ ui64 d = MeasureTaskDurationNs();
auto res = CalculationSlidingWindow.Update({1, d}, now);
- *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
-
+ *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
+
SchedulePing(ctx, hpNow);
}
@@ -174,10 +174,10 @@ private:
IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
- const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
+ const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter)
{
- return new TSelfPingActor(sendInterval, counter, calculationTimeCounter);
+ return new TSelfPingActor(sendInterval, counter, calculationTimeCounter);
}
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h
index e26bd13599..d7d07f9fa8 100644
--- a/library/cpp/actors/helpers/selfping_actor.h
+++ b/library/cpp/actors/helpers/selfping_actor.h
@@ -7,7 +7,7 @@ namespace NActors {
NActors::IActor* CreateSelfPingActor(
TDuration sendInterval,
- const NMonitoring::TDynamicCounters::TCounterPtr& counter,
- const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter);
+ const NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter);
} // NActors
diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp
index aaf23cad24..459635fa24 100644
--- a/library/cpp/actors/helpers/selfping_actor_ut.cpp
+++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp
@@ -21,14 +21,14 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) {
//const TActorId sender = runtime.AllocateEdgeActor();
NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr());
- NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr());
+ NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr());
auto actor = CreateSelfPingActor(
TDuration::MilliSeconds(100), // sendInterval (unused in test)
- counter, counter2);
+ counter, counter2);
UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0);
const TActorId actorId = runtime->Register(actor);
Y_UNUSED(actorId);
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index 25c882dd3a..36c6855d93 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -226,7 +226,7 @@ TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingR
request->Host,
request->URL.Before('?'),
response ? response->Status : "504",
- TDuration::Seconds(std::abs(request->Timer.Passed()))
+ TDuration::Seconds(std::abs(request->Timer.Passed()))
);
}
@@ -236,7 +236,7 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR
request->Host,
request->URL.Before('?'),
response->Status,
- TDuration::Seconds(std::abs(request->Timer.Passed()))
+ TDuration::Seconds(std::abs(request->Timer.Passed()))
);
}
diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp
index fc57a43dfb..224160d4b4 100644
--- a/library/cpp/actors/interconnect/interconnect_counters.cpp
+++ b/library/cpp/actors/interconnect/interconnect_counters.cpp
@@ -232,7 +232,7 @@ namespace {
if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
PerSessionCounters.Reset();
}
- VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId));
+ VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId));
if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) {
PerDataCenterCounters.Reset();
}
diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
index 015031f615..59e7dda810 100644
--- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp
+++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp
@@ -88,7 +88,7 @@ namespace NInterconnect {
TPollerUnit::IdleThread(void* param) {
// TODO: musl-libc version of `sched_param` struct is for some reason different from pthread
// version in Ubuntu 12.04
-#if defined(_linux_) && !defined(_musl_)
+#if defined(_linux_) && !defined(_musl_)
pthread_t threadSelf = pthread_self();
sched_param sparam = {20};
pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h
index 780bdb4322..48851de2c5 100644
--- a/library/cpp/actors/interconnect/ut/lib/interrupter.h
+++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h
@@ -119,7 +119,7 @@ private:
bool DelayTraffic;
void UpdateRejectingState() {
- if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) {
+ if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) {
RejectingStateTimer.Reset();
CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
RejectingTraffic = !RejectingTraffic;
@@ -127,7 +127,7 @@ private:
}
void RandomlyDisconnect() {
- if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) {
+ if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) {
DisconnectTimer.Reset();
if (RandomNumber<ui32>(100) > 90) {
if (!Connections.empty()) {
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index f246a23811..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -214,10 +214,10 @@ namespace NActors {
return Scheduled.begin()->Deadline;
}
- ui64 TEventMailBox::GetSentEventCount() const {
- return Sent.size();
- }
-
+ ui64 TEventMailBox::GetSentEventCount() const {
+ return Sent.size();
+ }
+
class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
public:
TTimeProvider(TTestActorRuntimeBase& runtime)
@@ -238,9 +238,9 @@ namespace NActors {
TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
: Runtime(runtime)
, Node(node)
- {
+ {
Y_UNUSED(Runtime);
- }
+ }
void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
Y_UNUSED(actorSystem);
@@ -333,9 +333,9 @@ namespace NActors {
if (verbose)
Cerr << "Event was added to scheduled queue\n";
} else {
- if (cookie) {
- cookie->Detach();
- }
+ if (cookie) {
+ cookie->Detach();
+ }
if (verbose) {
Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
}
@@ -368,22 +368,22 @@ namespace NActors {
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
- if (ev->GetRecipientRewrite() == logger) {
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
- if (recipientActor) {
+ if (ev->GetRecipientRewrite() == logger) {
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
+ if (recipientActor) {
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
- recipientActor->Receive(ev, ctx);
+ recipientActor->Receive(ev, ctx);
TlsActivationContext = prevTlsActivationContext;
// we expect the logger to never die in tests
- }
- }
- } else {
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- }
+ }
+ }
+ } else {
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ }
if (verbose)
Cerr << "Event was added to sent queue\n";
} else {
@@ -458,7 +458,7 @@ namespace NActors {
TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
: ScheduledCount(0)
, ScheduledLimit(100000)
- , MainThreadId(TThread::CurrentThreadId())
+ , MainThreadId(TThread::CurrentThreadId())
, ClusterUUID(MakeClusterId())
, FirstNodeId(NextNodeId)
, NodeCount(nodeCount)
@@ -481,7 +481,7 @@ namespace NActors {
, RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
, CurrentDispatchContext(nullptr)
{
- SetDispatcherRandomSeed(TInstant::Now(), 0);
+ SetDispatcherRandomSeed(TInstant::Now(), 0);
EnableActorCallstack();
}
@@ -541,7 +541,7 @@ namespace NActors {
Cerr.Flush();
Clog.Flush();
- DisableActorCallstack();
+ DisableActorCallstack();
}
void TTestActorRuntimeBase::CleanupNodes() {
@@ -580,10 +580,10 @@ namespace NActors {
void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
- if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
- runtime.ScheduleWhiteList.insert(actorId);
+ if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
+ runtime.ScheduleWhiteList.insert(actorId);
runtime.ScheduleWhiteListParent[actorId] = parentId;
- }
+ }
}
class TScheduledTreeItem {
@@ -644,8 +644,8 @@ namespace NActors {
auto& item = *scheduledEvents.begin();
TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
eventTypes[std::make_pair(item.Event->Recipient, name)]++;
- runtime.ScheduledCount++;
- if (runtime.ScheduledCount > runtime.ScheduledLimit) {
+ runtime.ScheduledCount++;
+ if (runtime.ScheduledCount > runtime.ScheduledLimit) {
// TScheduledTreeItem root("Root");
// TVector<TString> path;
// for (const auto& pr : eventTypes) {
@@ -665,10 +665,10 @@ namespace NActors {
// }
// root.RecursiveSort();
// root.Print(Cerr);
-
- ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
- }
- if (item.Cookie->Get()) {
+
+ ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
+ }
+ if (item.Cookie->Get()) {
if (item.Cookie->Detach()) {
queue.push_back(item.Event);
}
@@ -808,8 +808,8 @@ namespace NActors {
if (newTime.MicroSeconds() > CurrentTimestamp) {
CurrentTimestamp = newTime.MicroSeconds();
for (auto& kv : Nodes) {
- AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
- AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
+ AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
+ AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
}
}
}
@@ -1125,42 +1125,42 @@ namespace NActors {
}
TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
- while (!currentMailboxes.empty()) {
+ while (!currentMailboxes.empty()) {
bool hasProgress = true;
while (hasProgress) {
++DispatchCyclesCount;
hasProgress = false;
- ui64 eventsToDispatch = 0;
- for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
- if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- eventsToDispatch += mboxIt->second->GetSentEventCount();
- }
- }
- ui32 eventsDispatched = 0;
-
- //TODO: count events before each cycle, break after dispatching that much events
- bool isEmpty = false;
- while (!isEmpty && eventsDispatched < eventsToDispatch) {
- ui64 mailboxCount = currentMailboxes.size();
+ ui64 eventsToDispatch = 0;
+ for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
+ if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ eventsToDispatch += mboxIt->second->GetSentEventCount();
+ }
+ }
+ ui32 eventsDispatched = 0;
+
+ //TODO: count events before each cycle, break after dispatching that much events
+ bool isEmpty = false;
+ while (!isEmpty && eventsDispatched < eventsToDispatch) {
+ ui64 mailboxCount = currentMailboxes.size();
ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
- auto startWithMboxIt = currentMailboxes.begin();
- for (ui64 i = 0; i < startWith; ++i) {
- ++startWithMboxIt;
- }
- auto endWithMboxIt = startWithMboxIt;
-
- isEmpty = true;
- auto mboxIt = startWithMboxIt;
+ auto startWithMboxIt = currentMailboxes.begin();
+ for (ui64 i = 0; i < startWith; ++i) {
+ ++startWithMboxIt;
+ }
+ auto endWithMboxIt = startWithMboxIt;
+
+ isEmpty = true;
+ auto mboxIt = startWithMboxIt;
TDeque<TEventMailboxId> suspectedBoxes;
- while (true) {
- auto& mbox = *mboxIt;
- bool isIgnored = true;
- if (!mbox.second->IsEmpty()) {
- HandleNonEmptyMailboxesForEachContext(mbox.first);
- if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
-
- bool isEdgeMailbox = false;
+ while (true) {
+ auto& mbox = *mboxIt;
+ bool isIgnored = true;
+ if (!mbox.second->IsEmpty()) {
+ HandleNonEmptyMailboxesForEachContext(mbox.first);
+ if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+
+ bool isEdgeMailbox = false;
if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
isEdgeMailbox = true;
TEventsList events;
@@ -1168,80 +1168,80 @@ namespace NActors {
for (auto& ev : events) {
TInverseGuard<TMutex> inverseGuard(Mutex);
ObserverFunc(*this, ev);
- }
+ }
mbox.second->PushFront(events);
}
-
- if (!isEdgeMailbox) {
- isEmpty = false;
- isIgnored = false;
- ++eventsDispatched;
- ++DispatchedEventsCount;
- if (DispatchedEventsCount > DispatchedEventsLimit) {
- ythrow TWithBackTrace<yexception>() << "Dispatched "
- << DispatchedEventsLimit << " events, limit reached.";
- }
-
- auto ev = mbox.second->Pop();
+
+ if (!isEdgeMailbox) {
+ isEmpty = false;
+ isIgnored = false;
+ ++eventsDispatched;
+ ++DispatchedEventsCount;
+ if (DispatchedEventsCount > DispatchedEventsLimit) {
+ ythrow TWithBackTrace<yexception>() << "Dispatched "
+ << DispatchedEventsLimit << " events, limit reached.";
+ }
+
+ auto ev = mbox.second->Pop();
if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
//UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
if (verbose) {
Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
PrintEvent(ev, this);
}
- }
-
- hasProgress = true;
- EEventAction action;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
- action = ObserverFunc(*this, ev);
- }
-
- switch (action) {
- case EEventAction::PROCESS:
- UpdateFinalEventsStatsForEachContext(*ev);
- SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
- break;
- case EEventAction::DROP:
- // do nothing
- break;
- case EEventAction::RESCHEDULE: {
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
- mbox.second->Freeze(deadline);
- mbox.second->PushFront(ev);
- break;
- }
- default:
+ }
+
+ hasProgress = true;
+ EEventAction action;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
+ action = ObserverFunc(*this, ev);
+ }
+
+ switch (action) {
+ case EEventAction::PROCESS:
+ UpdateFinalEventsStatsForEachContext(*ev);
+ SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
+ break;
+ case EEventAction::DROP:
+ // do nothing
+ break;
+ case EEventAction::RESCHEDULE: {
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
+ mbox.second->Freeze(deadline);
+ mbox.second->PushFront(ev);
+ break;
+ }
+ default:
Y_FAIL("Unknown action");
- }
- }
+ }
+ }
}
- }
+ }
Y_VERIFY(mboxIt != currentMailboxes.end());
- if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
- mboxIt->second->IsEmpty() &&
- mboxIt->second->IsScheduledEmpty() &&
- mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- suspectedBoxes.push_back(mboxIt->first);
+ if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
+ mboxIt->second->IsEmpty() &&
+ mboxIt->second->IsScheduledEmpty() &&
+ mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ suspectedBoxes.push_back(mboxIt->first);
}
- ++mboxIt;
- if (mboxIt == currentMailboxes.end()) {
- mboxIt = currentMailboxes.begin();
+ ++mboxIt;
+ if (mboxIt == currentMailboxes.end()) {
+ mboxIt = currentMailboxes.begin();
}
Y_VERIFY(endWithMboxIt != currentMailboxes.end());
- if (mboxIt == endWithMboxIt) {
+ if (mboxIt == endWithMboxIt) {
break;
}
}
- for (auto id : suspectedBoxes) {
- auto it = currentMailboxes.find(id);
- if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
- it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- currentMailboxes.erase(it);
- }
+ for (auto id : suspectedBoxes) {
+ auto it = currentMailboxes.find(id);
+ if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
+ it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ currentMailboxes.erase(it);
+ }
}
}
}
@@ -1283,7 +1283,7 @@ namespace NActors {
ythrow TWithBackTrace<TEmptyEventQueueException>();
}
- if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
+ if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
bool isEmpty = true;
TMaybe<TInstant> nearestMailboxDeadline;
@@ -1385,7 +1385,7 @@ namespace NActors {
void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
- senderNodeIndex, NodeCount);
+ senderNodeIndex, NodeCount);
SendInternal(ev, senderNodeIndex, viaActorSystem);
}
@@ -1475,11 +1475,11 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
- ui64 days = (time.Hours() / 24);
- DispatcherRandomSeed = (days << 32) ^ iteration;
+ ui64 days = (time.Hours() / 24);
+ DispatcherRandomSeed = (days << 32) ^ iteration;
DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
- }
-
+ }
+
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
TGuard<TMutex> guard(Mutex);
if (nodeIndex == Max<ui32>()) {
@@ -1569,24 +1569,24 @@ namespace NActors {
TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
IActor* recipientActor = mailbox->FindActor(recipientLocalId);
if (recipientActor) {
- // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
+ // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
TActorId actorId = ev->GetRecipientRewrite();
node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
- TActivationContext *prevTlsActivationContext = TlsActivationContext;
+ TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
CurrentRecipient = actorId;
{
TInverseGuard<TMutex> inverseGuard(Mutex);
-#ifdef USE_ACTOR_CALLSTACK
- TCallstack::GetTlsCallstack() = ev->Callstack;
- TCallstack::GetTlsCallstack().SetLinesToSkip();
-#endif
+#ifdef USE_ACTOR_CALLSTACK
+ TCallstack::GetTlsCallstack() = ev->Callstack;
+ TCallstack::GetTlsCallstack().SetLinesToSkip();
+#endif
recipientActor->Receive(evHolder, ctx);
node->ExecutorThread->DropUnregistered();
}
CurrentRecipient = TActorId();
- TlsActivationContext = prevTlsActivationContext;
+ TlsActivationContext = prevTlsActivationContext;
} else {
if (VERBOSE) {
Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
@@ -1831,7 +1831,7 @@ namespace NActors {
} else {
while (Context->Queue->Head()) {
HasReply = false;
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
int count = 100;
while (!HasReply && count > 0) {
try {
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index f41c4889ad..26e3b45c98 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -100,13 +100,13 @@ namespace NActors {
struct TScheduledEventQueueItem {
TInstant Deadline;
TAutoPtr<IEventHandle> Event;
- TAutoPtr<TSchedulerCookieHolder> Cookie;
+ TAutoPtr<TSchedulerCookieHolder> Cookie;
ui64 UniqueId;
TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie)
: Deadline(deadline)
, Event(event)
- , Cookie(new TSchedulerCookieHolder(cookie))
+ , Cookie(new TSchedulerCookieHolder(cookie))
, UniqueId(++NextUniqueId)
{}
@@ -151,7 +151,7 @@ namespace NActors {
void Schedule(const TScheduledEventQueueItem& item);
bool IsScheduledEmpty() const;
TInstant GetFirstScheduleDeadline() const;
- ui64 GetSentEventCount() const;
+ ui64 GetSentEventCount() const;
private:
TScheduledEventsList Scheduled;
@@ -173,15 +173,15 @@ namespace NActors {
}
};
- class TSchedulingLimitReachedException : public yexception {
- public:
- TSchedulingLimitReachedException(ui64 limit) {
- TStringStream str;
- str << "TestActorRuntime Processed over " << limit << " events.";
- Append(str.Str());
- }
- };
-
+ class TSchedulingLimitReachedException : public yexception {
+ public:
+ TSchedulingLimitReachedException(ui64 limit) {
+ TStringStream str;
+ str << "TestActorRuntime Processed over " << limit << " events.";
+ Append(str.Str());
+ }
+ };
+
class TTestActorRuntimeBase: public TNonCopyable {
public:
class TEdgeActor;
@@ -222,9 +222,9 @@ namespace NActors {
static bool IsVerbose();
static void SetVerbose(bool verbose);
TDuration SetDispatchTimeout(TDuration timeout);
- void SetDispatchedEventsLimit(ui64 limit) {
- DispatchedEventsLimit = limit;
- }
+ void SetDispatchedEventsLimit(ui64 limit) {
+ DispatchedEventsLimit = limit;
+ }
TDuration SetReschedulingDelay(TDuration delay);
void SetLogBackend(const TAutoPtr<TLogBackend> logBackend);
void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority);
@@ -399,15 +399,15 @@ namespace NActors {
return {};
}
- template <typename TEvent>
+ template <typename TEvent>
TEvent* GrabEdgeEventRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) {
- try {
+ try {
return GrabEdgeEvent<TEvent>(handle, simTimeout);
- } catch (...) {
+ } catch (...) {
ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage();
- }
- }
-
+ }
+ }
+
template<class TEvent>
typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) {
try {
@@ -453,17 +453,17 @@ namespace NActors {
}
}
- void ResetScheduledCount() {
- ScheduledCount = 0;
- }
-
- void SetScheduledLimit(ui64 limit) {
- ScheduledLimit = limit;
- }
-
- void SetDispatcherRandomSeed(TInstant time, ui64 iteration);
+ void ResetScheduledCount() {
+ ScheduledCount = 0;
+ }
+
+ void SetScheduledLimit(ui64 limit) {
+ ScheduledLimit = limit;
+ }
+
+ void SetDispatcherRandomSeed(TInstant time, ui64 iteration);
TString GetActorName(const TActorId& actorId) const;
-
+
const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; }
void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; }
@@ -502,8 +502,8 @@ namespace NActors {
bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline);
private:
- ui64 ScheduledCount;
- ui64 ScheduledLimit;
+ ui64 ScheduledCount;
+ ui64 ScheduledLimit;
THolder<TTempDir> TmpDir;
const TThread::TId MainThreadId;
@@ -525,9 +525,9 @@ namespace NActors {
TMap<ui32, ui64> EvCounters;
ui64 DispatchCyclesCount;
ui64 DispatchedEventsCount;
- ui64 DispatchedEventsLimit = 2'500'000;
+ ui64 DispatchedEventsLimit = 2'500'000;
TActorId CurrentRecipient;
- ui64 DispatcherRandomSeed;
+ ui64 DispatcherRandomSeed;
TIntrusivePtr<IRandomProvider> DispatcherRandomProvider;
TAutoPtr<TLogBackend> LogBackend;
bool NeedMonitoring;
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
index 45267f9e99..f5595efbaa 100644
--- a/library/cpp/actors/util/rope.h
+++ b/library/cpp/actors/util/rope.h
@@ -53,11 +53,11 @@ public:
free(ptr);
}
- void operator delete(void* p, void* ptr) {
- Y_UNUSED(p);
- Y_UNUSED(ptr);
- }
-
+ void operator delete(void* p, void* ptr) {
+ Y_UNUSED(p);
+ Y_UNUSED(ptr);
+ }
+
TData GetData() const override {
return {Data + Offset, Size};
}
@@ -237,7 +237,7 @@ class TRope {
case EType::STRING: return wrapper(reinterpret_cast<TString&>(value));
case EType::ROPE_CHUNK_BACKEND: return wrapper(reinterpret_cast<IRopeChunkBackend::TPtr&>(value));
}
- Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type));
+ Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type));
}
template<typename TCallback>
diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h
index ac1e1e4b2b..76f036c0cf 100644
--- a/library/cpp/actors/util/unordered_cache.h
+++ b/library/cpp/actors/util/unordered_cache.h
@@ -126,18 +126,18 @@ public:
}
}
- ~TUnorderedCache() {
+ ~TUnorderedCache() {
Y_VERIFY(!Pop(0));
- for (ui64 i = 0; i < Concurrency; ++i) {
+ for (ui64 i = 0; i < Concurrency; ++i) {
if (ReadSlots[i].ReadFrom) {
delete ReadSlots[i].ReadFrom;
ReadSlots[i].ReadFrom = nullptr;
- }
+ }
WriteSlots[i].WriteTo = nullptr;
- }
- }
-
+ }
+ }
+
T Pop(ui64 readerRotation) noexcept {
ui64 readerIndex = readerRotation;
const ui64 endIndex = readerIndex + Concurrency;
diff --git a/library/cpp/actors/util/ut/ya.make b/library/cpp/actors/util/ut/ya.make
index 8cb52a067e..3b08b77984 100644
--- a/library/cpp/actors/util/ut/ya.make
+++ b/library/cpp/actors/util/ut/ya.make
@@ -1,10 +1,10 @@
UNITTEST_FOR(library/cpp/actors/util)
-IF (WITH_VALGRIND)
- TIMEOUT(600)
- SIZE(MEDIUM)
-ENDIF()
-
+IF (WITH_VALGRIND)
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
OWNER(
alexvru
g:kikimr