diff options
author | Cthulhu <cthulhu@yandex-team.ru> | 2022-02-10 16:47:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:44 +0300 |
commit | 6aced6c854653b75aab9808d5995be5fc4d9fa53 (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library | |
parent | bcb3e9d0eb2a8188a6a9fe0907a8949ce4881a4e (diff) | |
download | ydb-6aced6c854653b75aab9808d5995be5fc4d9fa53.tar.gz |
Restoring authorship annotation for Cthulhu <cthulhu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
59 files changed, 1111 insertions, 1111 deletions
diff --git a/library/cpp/actors/README.md b/library/cpp/actors/README.md index c4058444dd..c39908f2f5 100644 --- a/library/cpp/actors/README.md +++ b/library/cpp/actors/README.md @@ -1,107 +1,107 @@ -## Actor library - -### Часть первая, вводная. -Иногда приходится разрабатывать асинхронные, существенно параллельные, местами распределённые программы. Иногда еще и внутренняя логика нетривиальна, разнородна, пишется разными командами не один год. Всё как мы любим. Человечеством придумано не так много способов внутренней организации структуры и кода таких программ. Большинство из них плохие (и именно из-за плохих подходов разработка асинхронных, многопоточных программ приобрела дурную славу). Некоторые получше. А серебряной пули как обычно нет. - -Когда мы начинали разработку Yandex Database (тогда еще KiKiMR), сразу было понятно что простыми наколеночными поделиями обойтись (и сделать при этом хорошо, так что бы не было стыдно) не получится. В качестве базиса мы выбрали мессадж-пассинг и модель акторов. И не пожалели. Постепенно этот подход распространился на смежные проекты. - -### Базовые концепции. -Если отбросить шелуху – представляем сервис (программу в случае запуска изолированного бинарника) как ансамбль независимых агентов, взаимодействующих через отправку асинхронных сообщений внутри общего окружения. Тут все слова важны: - -Независимых – не разделяют состояние и поток выполнения. -Передача сообщений – формализуем протоколы, а не интерфейсы. - -Асинхронная – не блокируемся на отправке сообщений. -Общее окружение – все агенты разделяют общий пул ресурсов и каждый из них, зная адрес, может послать сообщение каждому. - -В более хайповых терминах – очень похоже на колокейтед микросервисы, только уровнем ниже. И да, мы заведомо не хотели прятать асинхронщину и параллелизм от разработчика, показывая прям самое мясо. - -### IActor. -https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/core/actor.h?rev=5315854#L105 -Базовый класс всех агентов, напрямую обычно не используется. Инстанцируется либо TActor, либо TActorBootstrapped. Фактически весь полезный код программы размещается в акторах. -(важное замечание – в коде увидите ручки с TActorContext и без него, схожие по названию и назначению. На данный момент вариант с TActorContext является устаревшим, новый код стоит писать без его использования). -Важные методы: - -PassAway – единственный корректный способ зарегистрированному актору умереть. Может вызываться только находясь внутри обработчика сообщения. -Send – отправка сообщения, зная адрес получателя. В акторе доступен хелпер, принимающий непосредственно сообщение. Базовый вызов, принимающий полный event handle – доступен в контексте. - -Become – установить функцию-обработчик сообщений, которая будет использована при получении следующего сообщения. - -Register – зарегистрировать новый актор в акторсистеме, с выделением нового мейлбокса. Важно – с момента вызова владение актором передается акторсистеме, т.е. уже к моменту выхода актор может начать выполняться на другом потоке, нельзя к нему ни обращаться прямыми вызовами, ни даже предполагать что он еще жив. - -Schedule – зарегистрировать сообщение, которое будет отправлено не менее чем через запрошенную задержку. В акторе доступен хелпер, декорирующий сообщение хендлом отправки самому себе, в контексте можно передать полный хендл. - -SelfId – узнать собственный адрес. Возвращаемый объект TActorIdentity можно передавать если требуется делегировать отправку сообщений от имени актора (например если пишете полезный код пользуясь пассивными объектами). -Посылка сообщений дешёвая, не нужно на ней чрезмерно экономить (но не бесплатная – поэтому посылать сообщения только ради посылки сообщений то же не стоит). - -Инстанцирование акторов так же дёшево, актор на запрос или фазу запроса – вполне нормальная практика. Мультиплексировать обработку разных запросов в одном акторе – так же вполне нормально. В нашем коде много примеров и первого, и второго. Пользуйтесь здравым смыслов и собственным вкусом. -Т.к. на время обработки сообщения актор занимает тред из пула акторсистемы – уходить в длинные вычисления лучше на отдельном отселённом акторе (и либо отселять в отдельный пол акторсистемы, либо контролировать параллельность брокером ресурсов), блокирующие вызовы делать почти всегда ошибка. Стремление написать мютекс - ересь и от лукавого. -Идентифицируются акторы своим TActorID-ом, который уникален и вы не должны его придумывать из воздуха, а только получить из регистрации (для порождённых акторов) или его вам должен рассказать кто-то, законно его знающий. - -Отправка на несуществующий актор (уже умерший) безопасна, сообщение будет просто выброшено в момент обработки (как обрабатывать недоставку сообщений в протоколах расскажу ниже). - -Кроме нормальных TActorID существуют еще и сервисные (составленные из строчки и номера ноды). Под ними может быть зарегистрирован реальный актор и фактически при получении сообщения по сервисному адресу – попробует переправить его текущему фактическому. Это позволяет размещать хорошо известные сервисы по хорошо известному адресу, не выстраивая параллельную машинерию поиска. - -Строить из актора конечный автомат при помощи переключений функции-обработчика – выбор в каждом конкретном случае, иногда удобнее да, иногда сваливать всё в одно состояние, а иногда – применять гибридное решение (когда часть жизненного цикла – обычно инициализации и завершение – выражены в переходах, а часть – нет). -Меньше слов, больше дела – этого уже достаточно что бы прочитать самый простой пример. https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/01_ping_pong -Здесь можно увидеть образец самого простого актора, занимающегося переброской сообщений и использующего все основные вызовы. Заодно покрутив за разные ручки (количество тредов в тредпуле, количество пар перебрасывающихся акторов) можно посмотреть на изменение поведения системы (hint: в таких простых сценариях максимум перфоманса достигается при одном треде в тредпулах). - -### Event и Event Handle. -Полезную нагрузку сообщений заворачиваем в наследника IEventBase, у которого два важных метода – сериализация и загрузка. Сериализация виртуальная, а вот загрузка – нет, и для разбора сообщения из байтовой последовательности – необходимо на стороне получателя сматчить число-идентификатор типа ивента с С++ типом. Именно это делают макросы из hfunc.h. На практике ивенты создаются либо как наследник TEventLocal<> (для строго локальных сообщений) либо как наследник TEventPB<> (для потенциально пересылаемых по сети сообщений, типизируются protobuf-мессаджем). - -Кроме непосредственно ивента (в виде структуры либо в виде байтовой строки) для пересылки сообщения необходим набор дополнительных полей - -Адресат - -Отправитель - -Тип сообщения - -Кука - -Флаги - -Сообщение + дополнительные поля = IEventHandle. Именно хендлами акторсистема и оперирует. <event-type>::TPtr – в примере выше – это и есть указатель на типизированный хендл. - -Технически типом сообщения может быть любое число, которое получатель и отправитель договорились понимать как идентификатор сообщения. Сложившаяся практика – выделять диапазон идентификаторов макросом EventSpaceBegin (фактически блоками по 64к), начиная с блока ES_USERSPACE. -Кука – неинтерпретируемое ui64 число, передаваемое с хендлом. Хорошей практикой является в ответе сервиса на сообщение выставлять куку в куку исходного сообщения, особенно для сервисов, потенциально используемых конкурентно. - -В флагах несколько бит зарезервировано под флаги, декларирующие как необходимо обрабатывать особые ситуации и 12 бит – под номер канала интерконнекта, в котором будет пересылаться сообщение (для локальных сообщений в имеющихся реализациях номер канала не имеет значения - хотя можно представить реализацию где для каналов будут независимые очереди). - -### Тредпулы и мейлбоксы. -В рамках одной акторсистемы может сосуществовать несколько независимых тредпулов, каждый актор регистрируется на конкретном и в процессе жизни не может мигрировать (но может создавать новые акторы на произвольном тредпуле). Используется для крупноблочного разделения ресурсов, либо между разными активностями (вот здесь – обрабатываем один класс запросов, а вот здесь - другой), либо между разными профилями активности (вот здесь обрабатываем быстрые запросы, здесь – медленные, а вот там – вообще батчёвые). Например в YDB работает системный тредпул (в котором запускаются акторы, необходимые для функционирования YDB, и для которого мы следим что бы не было длительной блокировки в обработчиках), пользовательский тредпул (в котором обрабатываются запросы и потенциально обработчики могут уходить в себя подольше, но это не повлияет на инфраструктуру), батчёвый тредпул (куда отгружается длительная обработка – компакшены дисков, сканы таблиц и подобное) и, в жирных нодах – тредпул интерконнекта (как наиболее чувствительного к задержкам). -Пересылка сообщений между акторами разных тредпулов но одной локальной акторсистемы остаётся локальной, принудительной сериализации сообщения не происходит. - +## Actor library + +### Часть первая, вводная. +Иногда приходится разрабатывать асинхронные, существенно параллельные, местами распределённые программы. Иногда еще и внутренняя логика нетривиальна, разнородна, пишется разными командами не один год. Всё как мы любим. Человечеством придумано не так много способов внутренней организации структуры и кода таких программ. Большинство из них плохие (и именно из-за плохих подходов разработка асинхронных, многопоточных программ приобрела дурную славу). Некоторые получше. А серебряной пули как обычно нет. + +Когда мы начинали разработку Yandex Database (тогда еще KiKiMR), сразу было понятно что простыми наколеночными поделиями обойтись (и сделать при этом хорошо, так что бы не было стыдно) не получится. В качестве базиса мы выбрали мессадж-пассинг и модель акторов. И не пожалели. Постепенно этот подход распространился на смежные проекты. + +### Базовые концепции. +Если отбросить шелуху – представляем сервис (программу в случае запуска изолированного бинарника) как ансамбль независимых агентов, взаимодействующих через отправку асинхронных сообщений внутри общего окружения. Тут все слова важны: + +Независимых – не разделяют состояние и поток выполнения. +Передача сообщений – формализуем протоколы, а не интерфейсы. + +Асинхронная – не блокируемся на отправке сообщений. +Общее окружение – все агенты разделяют общий пул ресурсов и каждый из них, зная адрес, может послать сообщение каждому. + +В более хайповых терминах – очень похоже на колокейтед микросервисы, только уровнем ниже. И да, мы заведомо не хотели прятать асинхронщину и параллелизм от разработчика, показывая прям самое мясо. + +### IActor. +https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/core/actor.h?rev=5315854#L105 +Базовый класс всех агентов, напрямую обычно не используется. Инстанцируется либо TActor, либо TActorBootstrapped. Фактически весь полезный код программы размещается в акторах. +(важное замечание – в коде увидите ручки с TActorContext и без него, схожие по названию и назначению. На данный момент вариант с TActorContext является устаревшим, новый код стоит писать без его использования). +Важные методы: + +PassAway – единственный корректный способ зарегистрированному актору умереть. Может вызываться только находясь внутри обработчика сообщения. +Send – отправка сообщения, зная адрес получателя. В акторе доступен хелпер, принимающий непосредственно сообщение. Базовый вызов, принимающий полный event handle – доступен в контексте. + +Become – установить функцию-обработчик сообщений, которая будет использована при получении следующего сообщения. + +Register – зарегистрировать новый актор в акторсистеме, с выделением нового мейлбокса. Важно – с момента вызова владение актором передается акторсистеме, т.е. уже к моменту выхода актор может начать выполняться на другом потоке, нельзя к нему ни обращаться прямыми вызовами, ни даже предполагать что он еще жив. + +Schedule – зарегистрировать сообщение, которое будет отправлено не менее чем через запрошенную задержку. В акторе доступен хелпер, декорирующий сообщение хендлом отправки самому себе, в контексте можно передать полный хендл. + +SelfId – узнать собственный адрес. Возвращаемый объект TActorIdentity можно передавать если требуется делегировать отправку сообщений от имени актора (например если пишете полезный код пользуясь пассивными объектами). +Посылка сообщений дешёвая, не нужно на ней чрезмерно экономить (но не бесплатная – поэтому посылать сообщения только ради посылки сообщений то же не стоит). + +Инстанцирование акторов так же дёшево, актор на запрос или фазу запроса – вполне нормальная практика. Мультиплексировать обработку разных запросов в одном акторе – так же вполне нормально. В нашем коде много примеров и первого, и второго. Пользуйтесь здравым смыслов и собственным вкусом. +Т.к. на время обработки сообщения актор занимает тред из пула акторсистемы – уходить в длинные вычисления лучше на отдельном отселённом акторе (и либо отселять в отдельный пол акторсистемы, либо контролировать параллельность брокером ресурсов), блокирующие вызовы делать почти всегда ошибка. Стремление написать мютекс - ересь и от лукавого. +Идентифицируются акторы своим TActorID-ом, который уникален и вы не должны его придумывать из воздуха, а только получить из регистрации (для порождённых акторов) или его вам должен рассказать кто-то, законно его знающий. + +Отправка на несуществующий актор (уже умерший) безопасна, сообщение будет просто выброшено в момент обработки (как обрабатывать недоставку сообщений в протоколах расскажу ниже). + +Кроме нормальных TActorID существуют еще и сервисные (составленные из строчки и номера ноды). Под ними может быть зарегистрирован реальный актор и фактически при получении сообщения по сервисному адресу – попробует переправить его текущему фактическому. Это позволяет размещать хорошо известные сервисы по хорошо известному адресу, не выстраивая параллельную машинерию поиска. + +Строить из актора конечный автомат при помощи переключений функции-обработчика – выбор в каждом конкретном случае, иногда удобнее да, иногда сваливать всё в одно состояние, а иногда – применять гибридное решение (когда часть жизненного цикла – обычно инициализации и завершение – выражены в переходах, а часть – нет). +Меньше слов, больше дела – этого уже достаточно что бы прочитать самый простой пример. https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/01_ping_pong +Здесь можно увидеть образец самого простого актора, занимающегося переброской сообщений и использующего все основные вызовы. Заодно покрутив за разные ручки (количество тредов в тредпуле, количество пар перебрасывающихся акторов) можно посмотреть на изменение поведения системы (hint: в таких простых сценариях максимум перфоманса достигается при одном треде в тредпулах). + +### Event и Event Handle. +Полезную нагрузку сообщений заворачиваем в наследника IEventBase, у которого два важных метода – сериализация и загрузка. Сериализация виртуальная, а вот загрузка – нет, и для разбора сообщения из байтовой последовательности – необходимо на стороне получателя сматчить число-идентификатор типа ивента с С++ типом. Именно это делают макросы из hfunc.h. На практике ивенты создаются либо как наследник TEventLocal<> (для строго локальных сообщений) либо как наследник TEventPB<> (для потенциально пересылаемых по сети сообщений, типизируются protobuf-мессаджем). + +Кроме непосредственно ивента (в виде структуры либо в виде байтовой строки) для пересылки сообщения необходим набор дополнительных полей + +Адресат + +Отправитель + +Тип сообщения + +Кука + +Флаги + +Сообщение + дополнительные поля = IEventHandle. Именно хендлами акторсистема и оперирует. <event-type>::TPtr – в примере выше – это и есть указатель на типизированный хендл. + +Технически типом сообщения может быть любое число, которое получатель и отправитель договорились понимать как идентификатор сообщения. Сложившаяся практика – выделять диапазон идентификаторов макросом EventSpaceBegin (фактически блоками по 64к), начиная с блока ES_USERSPACE. +Кука – неинтерпретируемое ui64 число, передаваемое с хендлом. Хорошей практикой является в ответе сервиса на сообщение выставлять куку в куку исходного сообщения, особенно для сервисов, потенциально используемых конкурентно. + +В флагах несколько бит зарезервировано под флаги, декларирующие как необходимо обрабатывать особые ситуации и 12 бит – под номер канала интерконнекта, в котором будет пересылаться сообщение (для локальных сообщений в имеющихся реализациях номер канала не имеет значения - хотя можно представить реализацию где для каналов будут независимые очереди). + +### Тредпулы и мейлбоксы. +В рамках одной акторсистемы может сосуществовать несколько независимых тредпулов, каждый актор регистрируется на конкретном и в процессе жизни не может мигрировать (но может создавать новые акторы на произвольном тредпуле). Используется для крупноблочного разделения ресурсов, либо между разными активностями (вот здесь – обрабатываем один класс запросов, а вот здесь - другой), либо между разными профилями активности (вот здесь обрабатываем быстрые запросы, здесь – медленные, а вот там – вообще батчёвые). Например в YDB работает системный тредпул (в котором запускаются акторы, необходимые для функционирования YDB, и для которого мы следим что бы не было длительной блокировки в обработчиках), пользовательский тредпул (в котором обрабатываются запросы и потенциально обработчики могут уходить в себя подольше, но это не повлияет на инфраструктуру), батчёвый тредпул (куда отгружается длительная обработка – компакшены дисков, сканы таблиц и подобное) и, в жирных нодах – тредпул интерконнекта (как наиболее чувствительного к задержкам). +Пересылка сообщений между акторами разных тредпулов но одной локальной акторсистемы остаётся локальной, принудительной сериализации сообщения не происходит. + При регистрации актор прикрепляется к мейлбоксу (в типичном случае на собственном мейлбоксе, но по особой нужде можно находясь внутри обработки сообщения прикрепить порождённый актор к текущему активному мейлбоксу – см. RegisterWithSameMailbox (ранее RegisterLocal) – в этом случае будет гарантироваться отсутствие конкурентной обработки сообщений). Собственно Send – это и есть заворачивание ивента в хендл, помещение хендла в очередь мейлбокса и добавление мейлбокса в очередь активации тредпула. В рамках одного мейлбокса – обработка FIFO, между мейлбоксами таких гарантий нет, хотя и стараемся активировать мейлбоксы примерно в порядке появления в них сообщений. - -При регистрации актора можно выбрать тип мейлбокса, они немного отличаются стоимость добавления – либо дёшево, но похуже под контеншеном, либо почти wait-free, но подороже. См. комментарии к TMailboxType за актуальными подсказками что-как. - -Полезные хелперы. - -STFUNC – декларация стейт-функции, рекомендую всегда использовать именно такую форму для декларации, т.к. потом проще искать. - -hFunc – макрос выбора хендлера, передающий ивент в обработчик. - -cFunc – макрос выбора хендлера, не передающий ивент в обработчик. - -### Обработка сбоев. -В рамках локальной акторсистемы доставка сообщений гарантирована, если по какой-то причине сообщение не доставлено (важно! Именно не доставлено, факт обработанности сообщения уже на совести принимающего актора) – то произойдёт одно из: - -Если выставлен флаг FlagForwardOnNondelivery – сообщение будет переправлено на актор, переданный как forwardOnNondelivery при конструировании хендла. Полезно например если какие-то сервисы создаются по требованию и для несозданных сервисов – желаем зароутить в роутер. Работает только в рамках локальной акторсистемы. - -Иначе при выставленном флаге FlagTrackDelivery – для отправителя будет сгенерирован ивент TEvUndelivered от имени недоступного актора. Получение такого сообщения гарантирует что исходный ивент не был обработан и никакие эффекты не произошли. Генерация и доставка нотификации в рамках локальной акторсистемы гарантирована, в распределённой – как повезёт, может и потеряться. - -Иначе, если никакие флаги не выставлены – сообщение будет выброшено. - -Т.к. в распределённой системе доставка нотификаций о недоставке не гарантируется, то для надёжной обработки сбоев необходим дополнительный механизм – по флагу FlagSubscribeOnSession при пересечении границ ноды происходит подписка отправителя на нотификацию о разрыве сетевой сессии, в рамках которой сообщение было отправлено. Теперь при разрыве сетевой сессии отправитель узнает что сообщение могло быть недоставлено (а могло и быть доставлено – мы не знаем) и сможет отреагировать. Нужно не забывать отписываться от нотификации о разрыве сессий – иначе будут копиться вплоть до ближайшего разрыва (который может и не скоро произойти). - -Резюмируя: при необходимости контролировать доставку внутри локальной акторсистемы – выставляем флаг FlagTrackDelivery и обрабатываем TEvUndelivered. Для распределённой – добавляем FlagSubscribeOnSession и дополнительно обрабатываем TEvNodeDisconnected не забывая отписываться от более не нужных подписок. - -### Интерконнект. -Локальная акторсистема – это только половина пирога, возможность объединить их в распределённую – вторая половина. Реализация интерконнекта доступна из коробки и умеет -Передавать сообщения через одно tcp-соединение -Мультиплексировать разные потоки (ака каналы) в рамках одного соединения, гарантируя порядок в рамках канала -Старается делать это хорошо. -В рамках распределённой системы требуется каждой локальной акторсистеме назначить уникальный номер (например табличкой или реализовав динамическую раздачу номеров ноды) и запустить в рамках каждой локальной акторсистемы локальный неймсервис (например по табличке ремапинга номера ноды в сетевой адрес либо как кеш опорного неймсервиса). -Смотрим на второй пример https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/02_discovery -Тут у нас конфигурируется распределённая акторсистема (в примере все пять запускаются в одном бинарнике, но точно так же – можно запускать и частями) на пять нод. На каждой ноде запускается реплика для паблишинга строчек и актор-эндпоинт (каждый со своим портом). Эндпоинты с помощью актора-паблишера публикуют свои явки/пароли на распределённый сторадж (с обработкой нештатных ситауций и поддержанием в актуальном состоянии). И рядом лежит реализация запроса к стораджу на листинг опубликованого по мажорити. Собственно это упрощённый и почищенный от специфики код, используемый в YDB для публикации и нахождения актуальных эндпоинтов пользовательской базы. + +При регистрации актора можно выбрать тип мейлбокса, они немного отличаются стоимость добавления – либо дёшево, но похуже под контеншеном, либо почти wait-free, но подороже. См. комментарии к TMailboxType за актуальными подсказками что-как. + +Полезные хелперы. + +STFUNC – декларация стейт-функции, рекомендую всегда использовать именно такую форму для декларации, т.к. потом проще искать. + +hFunc – макрос выбора хендлера, передающий ивент в обработчик. + +cFunc – макрос выбора хендлера, не передающий ивент в обработчик. + +### Обработка сбоев. +В рамках локальной акторсистемы доставка сообщений гарантирована, если по какой-то причине сообщение не доставлено (важно! Именно не доставлено, факт обработанности сообщения уже на совести принимающего актора) – то произойдёт одно из: + +Если выставлен флаг FlagForwardOnNondelivery – сообщение будет переправлено на актор, переданный как forwardOnNondelivery при конструировании хендла. Полезно например если какие-то сервисы создаются по требованию и для несозданных сервисов – желаем зароутить в роутер. Работает только в рамках локальной акторсистемы. + +Иначе при выставленном флаге FlagTrackDelivery – для отправителя будет сгенерирован ивент TEvUndelivered от имени недоступного актора. Получение такого сообщения гарантирует что исходный ивент не был обработан и никакие эффекты не произошли. Генерация и доставка нотификации в рамках локальной акторсистемы гарантирована, в распределённой – как повезёт, может и потеряться. + +Иначе, если никакие флаги не выставлены – сообщение будет выброшено. + +Т.к. в распределённой системе доставка нотификаций о недоставке не гарантируется, то для надёжной обработки сбоев необходим дополнительный механизм – по флагу FlagSubscribeOnSession при пересечении границ ноды происходит подписка отправителя на нотификацию о разрыве сетевой сессии, в рамках которой сообщение было отправлено. Теперь при разрыве сетевой сессии отправитель узнает что сообщение могло быть недоставлено (а могло и быть доставлено – мы не знаем) и сможет отреагировать. Нужно не забывать отписываться от нотификации о разрыве сессий – иначе будут копиться вплоть до ближайшего разрыва (который может и не скоро произойти). + +Резюмируя: при необходимости контролировать доставку внутри локальной акторсистемы – выставляем флаг FlagTrackDelivery и обрабатываем TEvUndelivered. Для распределённой – добавляем FlagSubscribeOnSession и дополнительно обрабатываем TEvNodeDisconnected не забывая отписываться от более не нужных подписок. + +### Интерконнект. +Локальная акторсистема – это только половина пирога, возможность объединить их в распределённую – вторая половина. Реализация интерконнекта доступна из коробки и умеет +Передавать сообщения через одно tcp-соединение +Мультиплексировать разные потоки (ака каналы) в рамках одного соединения, гарантируя порядок в рамках канала +Старается делать это хорошо. +В рамках распределённой системы требуется каждой локальной акторсистеме назначить уникальный номер (например табличкой или реализовав динамическую раздачу номеров ноды) и запустить в рамках каждой локальной акторсистемы локальный неймсервис (например по табличке ремапинга номера ноды в сетевой адрес либо как кеш опорного неймсервиса). +Смотрим на второй пример https://a.yandex-team.ru/arc/trunk/arcadia/library/actors/examples/02_discovery +Тут у нас конфигурируется распределённая акторсистема (в примере все пять запускаются в одном бинарнике, но точно так же – можно запускать и частями) на пять нод. На каждой ноде запускается реплика для паблишинга строчек и актор-эндпоинт (каждый со своим портом). Эндпоинты с помощью актора-паблишера публикуют свои явки/пароли на распределённый сторадж (с обработкой нештатных ситауций и поддержанием в актуальном состоянии). И рядом лежит реализация запроса к стораджу на листинг опубликованого по мажорити. Собственно это упрощённый и почищенный от специфики код, используемый в YDB для публикации и нахождения актуальных эндпоинтов пользовательской базы. diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 3fe085439a..c58698a206 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -1,6 +1,6 @@ #include "defs.h" #include "actorsystem.h" -#include "callstack.h" +#include "callstack.h" #include "cpu_manager.h" #include "mailbox.h" #include "events.h" @@ -64,10 +64,10 @@ namespace NActors { if (Y_UNLIKELY(!ev)) return false; -#ifdef USE_ACTOR_CALLSTACK +#ifdef USE_ACTOR_CALLSTACK ev->Callstack.TraceIfEmpty(); -#endif - +#endif + TActorId recipient = ev->GetRecipientRewrite(); const ui32 recpNodeId = recipient.NodeId(); diff --git a/library/cpp/actors/core/callstack.cpp b/library/cpp/actors/core/callstack.cpp index d1536d84f4..9297c1a079 100644 --- a/library/cpp/actors/core/callstack.cpp +++ b/library/cpp/actors/core/callstack.cpp @@ -1,14 +1,14 @@ -#include "callstack.h" -#include <util/thread/singleton.h> - -#ifdef USE_ACTOR_CALLSTACK - -namespace NActors { +#include "callstack.h" +#include <util/thread/singleton.h> + +#ifdef USE_ACTOR_CALLSTACK + +namespace NActors { namespace { void (*PreviousFormatBackTrace)(IOutputStream*) = 0; ui32 ActorBackTraceEnableCounter = 0; } - + void ActorFormatBackTrace(IOutputStream* out) { TStringStream str; PreviousFormatBackTrace(&str); @@ -16,19 +16,19 @@ namespace NActors { TCallstack::DumpCallstack(str); *out << str.Str(); } - + void EnableActorCallstack() { if (ActorBackTraceEnableCounter == 0) { Y_VERIFY(PreviousFormatBackTrace == 0); PreviousFormatBackTrace = SetFormatBackTraceFn(ActorFormatBackTrace); } - + ++ActorBackTraceEnableCounter; } void DisableActorCallstack() { --ActorBackTraceEnableCounter; - + if (ActorBackTraceEnableCounter == 0) { Y_VERIFY(PreviousFormatBackTrace); SetFormatBackTraceFn(PreviousFormatBackTrace); @@ -42,12 +42,12 @@ namespace NActors { , LinesToSkip(0) { } - + void TCallstack::SetLinesToSkip() { TTrace record; LinesToSkip = BackTrace(record.Data, TTrace::CAPACITY); } - + void TCallstack::Trace() { size_t currentIdx = (BeginIdx + Size) % RECORDS; if (Size == RECORDS) { @@ -59,18 +59,18 @@ namespace NActors { record.Size = BackTrace(record.Data, TTrace::CAPACITY); record.LinesToSkip = LinesToSkip; } - + void TCallstack::TraceIfEmpty() { if (Size == 0) { LinesToSkip = 0; Trace(); } - } - + } + TCallstack& TCallstack::GetTlsCallstack() { return *FastTlsSingleton<TCallstack>(); - } - + } + void TCallstack::DumpCallstack(TStringStream& str) { TCallstack& callstack = GetTlsCallstack(); for (int i = callstack.Size - 1; i >= 0; --i) { @@ -86,8 +86,8 @@ namespace NActors { FormatBackTrace(&str, record.Data, size); } str << Endl; - } - } + } + } } - -#endif + +#endif diff --git a/library/cpp/actors/core/callstack.h b/library/cpp/actors/core/callstack.h index 265e9b63b8..176717d2ae 100644 --- a/library/cpp/actors/core/callstack.h +++ b/library/cpp/actors/core/callstack.h @@ -1,24 +1,24 @@ -#pragma once - -#ifndef NDEBUG -//#define ENABLE_ACTOR_CALLSTACK -#endif - -#ifdef ENABLE_ACTOR_CALLSTACK +#pragma once + +#ifndef NDEBUG +//#define ENABLE_ACTOR_CALLSTACK +#endif + +#ifdef ENABLE_ACTOR_CALLSTACK #include "defs.h" #include <util/system/backtrace.h> #include <util/stream/str.h> #include <util/generic/deque.h> #define USE_ACTOR_CALLSTACK - -namespace NActors { + +namespace NActors { struct TCallstack { struct TTrace { static const size_t CAPACITY = 50; void* Data[CAPACITY]; size_t Size; size_t LinesToSkip; - + TTrace() : Size(0) , LinesToSkip(0) @@ -30,29 +30,29 @@ namespace NActors { static const size_t RECORDS_TO_SKIP = 2; TTrace Record[RECORDS]; size_t BeginIdx; - size_t Size; - size_t LinesToSkip; - + size_t Size; + size_t LinesToSkip; + TCallstack(); void SetLinesToSkip(); void Trace(); void TraceIfEmpty(); static TCallstack& GetTlsCallstack(); static void DumpCallstack(TStringStream& str); - }; - + }; + void EnableActorCallstack(); void DisableActorCallstack(); - + } - -#else - -namespace NActors { + +#else + +namespace NActors { inline void EnableActorCallstack(){}; - + inline void DisableActorCallstack(){}; - + } - -#endif + +#endif diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 89d1e2a969..6ff02aaf94 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -2,7 +2,7 @@ #include "defs.h" #include "actorid.h" -#include "callstack.h" +#include "callstack.h" #include "event_load.h" #include <library/cpp/actors/wilson/wilson_trace.h> @@ -117,9 +117,9 @@ namespace NActors { static const size_t ChannelBits = 12; static const size_t ChannelShift = (sizeof(ui32) << 3) - ChannelBits; -#ifdef USE_ACTOR_CALLSTACK +#ifdef USE_ACTOR_CALLSTACK TCallstack Callstack; -#endif +#endif ui16 GetChannel() const noexcept { return Flags >> ChannelShift; } @@ -133,7 +133,7 @@ namespace NActors { Y_VERIFY(flags < (1 << ChannelShift)); return (flags | (channel << ChannelShift)); } - + private: THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; @@ -165,7 +165,7 @@ namespace NActors { TActorId GetForwardOnNondeliveryRecipient() const { return OnNondeliveryHolder.Get() ? OnNondeliveryHolder->Recipient : TActorId(); } - + IEventHandle(const TActorId& recipient, const TActorId& sender, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, const TActorId* forwardOnNondelivery = nullptr, NWilson::TTraceId traceId = {}) : Type(ev->Type()) diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index d48411317b..446b651efd 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -1,6 +1,6 @@ #include "executor_thread.h" #include "actorsystem.h" -#include "callstack.h" +#include "callstack.h" #include "mailbox.h" #include "event.h" #include "events.h" @@ -148,10 +148,10 @@ namespace NActors { TActorContext ctx(*mailbox, *this, hpprev, recipient); TlsActivationContext = &ctx; -#ifdef USE_ACTOR_CALLSTACK +#ifdef USE_ACTOR_CALLSTACK TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); -#endif +#endif CurrentRecipient = recipient; CurrentActorScheduledEventsCounter = 0; diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index c58698ee1f..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -4,7 +4,7 @@ #include "event.h" #include "actor.h" #include "actorsystem.h" -#include "callstack.h" +#include "callstack.h" #include "probes.h" #include "worker_context.h" diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index a3522988b3..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -1,5 +1,5 @@ #include "log.h" -#include "log_settings.h" +#include "log_settings.h" #include <library/cpp/monlib/service/pages/templates.h> @@ -211,7 +211,7 @@ namespace NActors { void TLoggerActor::Log(TInstant time, NLog::EPriority priority, NLog::EComponent component, const char* c, ...) { Metrics->IncDirectMsgs(); - if (Settings && Settings->Satisfies(priority, component, 0ull)) { + if (Settings && Settings->Satisfies(priority, component, 0ull)) { va_list params; va_start(params, c); TString formatted; @@ -233,14 +233,14 @@ namespace NActors { if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { BecomeDefunct(); } - } - + } + void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ev); LogIgnoredCount(ctx.Now()); - IgnoredCount = 0; - PassedCount = 0; - } + IgnoredCount = 0; + PassedCount = 0; + } void TLoggerActor::HandleIgnoredEventDrop() { // logger backend is unavailable, just ignore @@ -248,7 +248,7 @@ namespace NActors { void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) { Metrics->IncActorMsgs(); - + const auto prio = ev.Level.ToPrio(); switch (prio) { @@ -281,7 +281,7 @@ namespace NActors { ++IgnoredCount; PassedCount = 0; return; - } + } PassedCount++; } else { // Enable of disable throttling depending on the load @@ -289,8 +289,8 @@ namespace NActors { AtomicSet(IsOverflow, 1); else if (delayMillisec <= (i64)Settings->TimeThresholdMs && AtomicGet(IsOverflow)) AtomicSet(IsOverflow, 0); - } - + } + const auto prio = ev->Get()->Level.ToPrio(); if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) { BecomeDefunct(); @@ -337,7 +337,7 @@ namespace NActors { auto name = Settings->ComponentName(i); if (!*name) continue; - NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i); + NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(i); TABLER() { TABLED() { @@ -410,8 +410,8 @@ namespace NActors { TStringStream str; if (hasComponent && !hasPriority && !hasSamplingPriority && !hasSamplingRate) { - NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component); - ui32 samplingRate = componentSettings.Raw.X.SamplingRate; + NLog::TComponentSettings componentSettings = Settings->GetComponentSettings(component); + ui32 samplingRate = componentSettings.Raw.X.SamplingRate; HTML(str) { DIV_CLASS("row") { DIV_CLASS("col-md-12") { @@ -682,17 +682,17 @@ namespace NActors { OutputDebugString(x.c_str()); } #endif - bool isOk = false; - do { - try { + bool isOk = false; + do { + try { TRecordWithNewline r(rec); Cerr.Write(r.Buf.Data(), r.Buf.Filled()); - isOk = true; - } catch (TSystemError err) { - // Interrupted system call + isOk = true; + } catch (TSystemError err) { + // Interrupted system call Y_UNUSED(err); - } - } while (!isOk); + } + } while (!isOk); } void ReopenLog() override { @@ -743,10 +743,10 @@ namespace NActors { return new TLineFileLogBackend(fileName); } - TAutoPtr<TLogBackend> CreateNullBackend() { - return new TNullLogBackend(); - } - + TAutoPtr<TLogBackend> CreateNullBackend() { + return new TNullLogBackend(); + } + TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) { return new TCompositeLogBackend(std::move(underlyingBackends)); } diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index cbcbcd8786..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -12,7 +12,7 @@ #include <util/generic/vector.h> #include <util/string/printf.h> -#include <util/string/builder.h> +#include <util/string/builder.h> #include <library/cpp/logger/all.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> @@ -24,14 +24,14 @@ // TODO: limit number of messages per second // TODO: make TLogComponentLevelRequest/Response network messages -#define IS_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component) \ - (static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()) && \ - static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings())->Satisfies( \ - static_cast<::NActors::NLog::EPriority>(priority), \ - static_cast<::NActors::NLog::EComponent>(component), \ - 0ull) \ - ) - +#define IS_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component) \ + (static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()) && \ + static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings())->Satisfies( \ + static_cast<::NActors::NLog::EPriority>(priority), \ + static_cast<::NActors::NLog::EComponent>(component), \ + 0ull) \ + ) + #define LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, sampleBy, ...) \ do { \ ::NActors::NLog::TSettings* mSettings = static_cast<::NActors::NLog::TSettings*>((actorCtxOrSystem).LoggerSettings()); \ @@ -49,9 +49,9 @@ logStringBuilder << stream; \ return static_cast<TString>(logStringBuilder); \ }().data()) - -#define LOG_LOG(actorCtxOrSystem, priority, component, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, __VA_ARGS__) -#define LOG_LOG_S(actorCtxOrSystem, priority, component, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, stream) + +#define LOG_LOG(actorCtxOrSystem, priority, component, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, __VA_ARGS__) +#define LOG_LOG_S(actorCtxOrSystem, priority, component, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, priority, component, 0ull, stream) // use these macros for logging via actor system or actor context #define LOG_EMERG(actorCtxOrSystem, component, ...) LOG_LOG(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, __VA_ARGS__) @@ -69,11 +69,11 @@ #define LOG_CRIT_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, stream) #define LOG_ERROR_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_ERROR, component, stream) #define LOG_WARN_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_WARN, component, stream) -#define LOG_NOTICE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_NOTICE, component, stream) +#define LOG_NOTICE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_NOTICE, component, stream) #define LOG_INFO_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_INFO, component, stream) #define LOG_DEBUG_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_DEBUG, component, stream) #define LOG_TRACE_S(actorCtxOrSystem, component, stream) LOG_LOG_S(actorCtxOrSystem, NActors::NLog::PRI_TRACE, component, stream) - + #define LOG_EMERG_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, sampleBy, __VA_ARGS__) #define LOG_ALERT_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_ALERT, component, sampleBy, __VA_ARGS__) #define LOG_CRIT_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, sampleBy, __VA_ARGS__) @@ -83,7 +83,7 @@ #define LOG_INFO_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_INFO, component, sampleBy, __VA_ARGS__) #define LOG_DEBUG_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_DEBUG, component, sampleBy, __VA_ARGS__) #define LOG_TRACE_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, ...) LOG_LOG_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_TRACE, component, sampleBy, __VA_ARGS__) - + #define LOG_EMERG_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_EMERG, component, sampleBy, stream) #define LOG_ALERT_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_ALERT, component, sampleBy, stream) #define LOG_CRIT_S_SAMPLED_BY(actorCtxOrSystem, component, sampleBy, stream) LOG_LOG_S_SAMPLED_BY(actorCtxOrSystem, NActors::NLog::PRI_CRIT, component, sampleBy, stream) @@ -167,11 +167,11 @@ namespace NActors { }; class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> { - public: - TLogIgnored() { - } - }; - + public: + TLogIgnored() { + } + }; + //////////////////////////////////////////////////////////////////////////////// // LOGGER ACTOR //////////////////////////////////////////////////////////////////////////////// @@ -212,7 +212,7 @@ namespace NActors { void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { - HFunc(TLogIgnored, HandleIgnoredEvent); + HFunc(TLogIgnored, HandleIgnoredEvent); HFunc(NLog::TEvLog, HandleLogEvent); HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); HFunc(NMon::TEvHttpInfo, HandleMonInfo); @@ -298,7 +298,7 @@ namespace NActors { bool logPError, bool logCons); TAutoPtr<TLogBackend> CreateStderrBackend(); TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName); - TAutoPtr<TLogBackend> CreateNullBackend(); + TAutoPtr<TLogBackend> CreateNullBackend(); TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends); ///////////////////////////////////////////////////////////////////// diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index 88233dddf9..f52f2fc5d2 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -9,7 +9,7 @@ namespace NActors { EPriority defPriority, EPriority defSamplingPriority, ui32 defSamplingRate, ui64 timeThresholdMs) : LoggerActorId(loggerActorId) - , LoggerComponent(loggerComponent) + , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) @@ -31,7 +31,7 @@ namespace NActors { EPriority defPriority, EPriority defSamplingPriority, ui32 defSamplingRate, ui64 timeThresholdMs) : LoggerActorId(loggerActorId) - , LoggerComponent(loggerComponent) + , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) @@ -102,16 +102,16 @@ namespace NActors { } if (component == InvalidComponent) { - for (int i = 0; i < Mask + 1; i++) { - TComponentSettings settings = AtomicGet(ComponentInfo[i]); - if (isSampling) { - settings.Raw.X.SamplingLevel = priority; - } else { - settings.Raw.X.Level = priority; - } - AtomicSet(ComponentInfo[i], settings.Raw.Data); - } - + for (int i = 0; i < Mask + 1; i++) { + TComponentSettings settings = AtomicGet(ComponentInfo[i]); + if (isSampling) { + settings.Raw.X.SamplingLevel = priority; + } else { + settings.Raw.X.Level = priority; + } + AtomicSet(ComponentInfo[i], settings.Raw.Data); + } + TStringStream str; str << titleName @@ -124,16 +124,16 @@ namespace NActors { explanation = "Invalid component"; return 1; } - TComponentSettings settings = AtomicGet(ComponentInfo[component]); - EPriority oldPriority; - if (isSampling) { - oldPriority = (EPriority)settings.Raw.X.SamplingLevel; - settings.Raw.X.SamplingLevel = priority; - } else { - oldPriority = (EPriority)settings.Raw.X.Level; - settings.Raw.X.Level = priority; - } - AtomicSet(ComponentInfo[component], settings.Raw.Data); + TComponentSettings settings = AtomicGet(ComponentInfo[component]); + EPriority oldPriority; + if (isSampling) { + oldPriority = (EPriority)settings.Raw.X.SamplingLevel; + settings.Raw.X.SamplingLevel = priority; + } else { + oldPriority = (EPriority)settings.Raw.X.Level; + settings.Raw.X.Level = priority; + } + AtomicSet(ComponentInfo[component], settings.Raw.Data); TStringStream str; str << titleName << " for the component " << ComponentNames[component] << " has been changed from " << PriorityToString(EPrio(oldPriority)) @@ -144,21 +144,21 @@ namespace NActors { } int TSettings::SetLevel(EPriority priority, EComponent component, TString& explanation) { - return SetLevelImpl("priority", false, + return SetLevelImpl("priority", false, priority, component, explanation); } int TSettings::SetSamplingLevel(EPriority priority, EComponent component, TString& explanation) { - return SetLevelImpl("sampling priority", true, + return SetLevelImpl("sampling priority", true, priority, component, explanation); } int TSettings::SetSamplingRate(ui32 sampling, EComponent component, TString& explanation) { if (component == InvalidComponent) { for (int i = 0; i < Mask + 1; i++) { - TComponentSettings settings = AtomicGet(ComponentInfo[i]); - settings.Raw.X.SamplingRate = sampling; - AtomicSet(ComponentInfo[i], settings.Raw.Data); + TComponentSettings settings = AtomicGet(ComponentInfo[i]); + settings.Raw.X.SamplingRate = sampling; + AtomicSet(ComponentInfo[i], settings.Raw.Data); } TStringStream str; str << "Sampling rate for all components has been changed to " << sampling; @@ -168,10 +168,10 @@ namespace NActors { explanation = "Invalid component"; return 1; } - TComponentSettings settings = AtomicGet(ComponentInfo[component]); - ui32 oldSampling = settings.Raw.X.SamplingRate; - settings.Raw.X.SamplingRate = sampling; - AtomicSet(ComponentInfo[component], settings.Raw.Data); + TComponentSettings settings = AtomicGet(ComponentInfo[component]); + ui32 oldSampling = settings.Raw.X.SamplingRate; + settings.Raw.X.SamplingRate = sampling; + AtomicSet(ComponentInfo[component], settings.Raw.Data); TStringStream str; str << "Sampling rate for the component " << ComponentNames[component] << " has been changed from " << oldSampling diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 0e8b76cc02..7fe4504edd 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -45,36 +45,36 @@ namespace NActors { ; // Log settings - struct TComponentSettings { - union { - struct { - ui32 SamplingRate; - ui8 SamplingLevel; - ui8 Level; - } X; - - ui64 Data; - } Raw; - - TComponentSettings(TAtomicBase data) { - Raw.Data = (ui64)data; - } - - TComponentSettings(ui8 level, ui8 samplingLevel, ui32 samplingRate) { - Raw.X.Level = level; - Raw.X.SamplingLevel = samplingLevel; - Raw.X.SamplingRate = samplingRate; - } - }; - + struct TComponentSettings { + union { + struct { + ui32 SamplingRate; + ui8 SamplingLevel; + ui8 Level; + } X; + + ui64 Data; + } Raw; + + TComponentSettings(TAtomicBase data) { + Raw.Data = (ui64)data; + } + + TComponentSettings(ui8 level, ui8 samplingLevel, ui32 samplingRate) { + Raw.X.Level = level; + Raw.X.SamplingLevel = samplingLevel; + Raw.X.SamplingRate = samplingRate; + } + }; + struct TSettings: public TThrRefBase { public: TActorId LoggerActorId; - EComponent LoggerComponent; - ui64 TimeThresholdMs; + EComponent LoggerComponent; + ui64 TimeThresholdMs; bool AllowDrop; TDuration ThrottleDelay; - TArrayHolder<TAtomic> ComponentInfo; + TArrayHolder<TAtomic> ComponentInfo; TVector<TString> ComponentNames; EComponent MinVal; EComponent MaxVal; @@ -120,33 +120,33 @@ namespace NActors { ); } - inline bool Satisfies(EPriority priority, EComponent component, ui64 sampleBy = 0) const { + inline bool Satisfies(EPriority priority, EComponent component, ui64 sampleBy = 0) const { // by using Mask we don't get outside of array boundaries - TComponentSettings settings = GetComponentSettings(component); - if (priority > settings.Raw.X.Level) { - if (priority > settings.Raw.X.SamplingLevel) { - return false; // priority > both levels ==> do not log - } - // priority <= sampling level ==> apply sampling - ui32 samplingRate = settings.Raw.X.SamplingRate; - if (samplingRate) { + TComponentSettings settings = GetComponentSettings(component); + if (priority > settings.Raw.X.Level) { + if (priority > settings.Raw.X.SamplingLevel) { + return false; // priority > both levels ==> do not log + } + // priority <= sampling level ==> apply sampling + ui32 samplingRate = settings.Raw.X.SamplingRate; + if (samplingRate) { ui32 samplingValue = sampleBy ? MurmurHash<ui32>((const char*)&sampleBy, sizeof(sampleBy)) : samplingRate != 1 ? RandomNumber<ui32>() : 0; - return (samplingValue % samplingRate == 0); - } else { - // sampling rate not set ==> do not log - return false; - } - } else { - // priority <= log level ==> log - return true; - } + return (samplingValue % samplingRate == 0); + } else { + // sampling rate not set ==> do not log + return false; + } + } else { + // priority <= log level ==> log + return true; + } } - inline TComponentSettings GetComponentSettings(EComponent component) const { + inline TComponentSettings GetComponentSettings(EComponent component) const { Y_VERIFY_DEBUG((component & Mask) == component); // by using Mask we don't get outside of array boundaries - return TComponentSettings(AtomicGet(ComponentInfo[component & Mask])); + return TComponentSettings(AtomicGet(ComponentInfo[component & Mask])); } const char* ComponentName(EComponent component) const { diff --git a/library/cpp/actors/core/scheduler_cookie.h b/library/cpp/actors/core/scheduler_cookie.h index 4cfb3b3660..2c20ca67f3 100644 --- a/library/cpp/actors/core/scheduler_cookie.h +++ b/library/cpp/actors/core/scheduler_cookie.h @@ -18,7 +18,7 @@ namespace NActors { static ISchedulerCookie* Make3Way(); }; - class TSchedulerCookieHolder : TNonCopyable { + class TSchedulerCookieHolder : TNonCopyable { ISchedulerCookie* Cookie; public: @@ -40,7 +40,7 @@ namespace NActors { return (Cookie == x.Cookie); } - ISchedulerCookie* Get() const { + ISchedulerCookie* Get() const { return Cookie; } diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h index 1ee337498e..3b8fac28f0 100644 --- a/library/cpp/actors/core/scheduler_queue.h +++ b/library/cpp/actors/core/scheduler_queue.h @@ -102,7 +102,7 @@ namespace NActors { class TWriterWithPadding: public TWriter { private: ui8 CacheLinePadding[64 - sizeof(TWriter)]; - + void UnusedCacheLinePadding() { Y_UNUSED(CacheLinePadding); } diff --git a/library/cpp/actors/core/servicemap.h b/library/cpp/actors/core/servicemap.h index 91620134b4..d72e50cae5 100644 --- a/library/cpp/actors/core/servicemap.h +++ b/library/cpp/actors/core/servicemap.h @@ -107,7 +107,7 @@ namespace NActors { if (ScanBranch(branch, key, hash / BaseSize, ret)) return ret; } - + return TValue(); } diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index c19b7f0e41..880a9d00db 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -34,8 +34,8 @@ SRCS( balancer.cpp buffer.cpp buffer.h - callstack.cpp - callstack.h + callstack.cpp + callstack.h config.h cpu_manager.cpp cpu_manager.h diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index 48e7625b18..f9bfaf8dc0 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -18,54 +18,54 @@ struct TEvPing: public TEventLocal<TEvPing, TEvents::THelloWorld::Ping> { const double TimeStart; }; -template <class TValueType_> -struct TAvgOperation { - struct TValueType { - ui64 Count = 0; - TValueType_ Sum = TValueType_(); - }; - using TValueVector = TVector<TValueType>; - - static constexpr TValueType InitialValue() { - return TValueType(); // zero - } - - // Updates value in current bucket and returns window value - static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) { - Y_ASSERT(index < buckets.size()); - buckets[index].Sum += newVal.Sum; - buckets[index].Count += newVal.Count; - windowValue.Sum += newVal.Sum; - windowValue.Count += newVal.Count; - return windowValue; - } - - static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) { - Y_ASSERT(!buckets.empty()); - Y_ASSERT(firstElemIndex < buckets.size()); - Y_ASSERT(bucketsToClear <= buckets.size()); - - const size_t arraySize = buckets.size(); - for (size_t i = 0; i < bucketsToClear; ++i) { - TValueType& curVal = buckets[firstElemIndex]; - windowValue.Sum -= curVal.Sum; - windowValue.Count -= curVal.Count; - curVal = InitialValue(); - firstElemIndex = (firstElemIndex + 1) % arraySize; - } - return windowValue; - } - -}; - +template <class TValueType_> +struct TAvgOperation { + struct TValueType { + ui64 Count = 0; + TValueType_ Sum = TValueType_(); + }; + using TValueVector = TVector<TValueType>; + + static constexpr TValueType InitialValue() { + return TValueType(); // zero + } + + // Updates value in current bucket and returns window value + static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) { + Y_ASSERT(index < buckets.size()); + buckets[index].Sum += newVal.Sum; + buckets[index].Count += newVal.Count; + windowValue.Sum += newVal.Sum; + windowValue.Count += newVal.Count; + return windowValue; + } + + static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) { + Y_ASSERT(!buckets.empty()); + Y_ASSERT(firstElemIndex < buckets.size()); + Y_ASSERT(bucketsToClear <= buckets.size()); + + const size_t arraySize = buckets.size(); + for (size_t i = 0; i < bucketsToClear; ++i) { + TValueType& curVal = buckets[firstElemIndex]; + windowValue.Sum -= curVal.Sum; + windowValue.Count -= curVal.Count; + curVal = InitialValue(); + firstElemIndex = (firstElemIndex + 1) % arraySize; + } + return windowValue; + } + +}; + class TSelfPingActor : public TActorBootstrapped<TSelfPingActor> { private: const TDuration SendInterval; const NMonitoring::TDynamicCounters::TCounterPtr Counter; - const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter; + const NMonitoring::TDynamicCounters::TCounterPtr CalculationTimeCounter; NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow; - NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow; + NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow; THPTimer Timer; @@ -74,13 +74,13 @@ public: return SELF_PING_ACTOR; } - TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, - const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) + TSelfPingActor(TDuration sendInterval, const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) : SendInterval(sendInterval) , Counter(counter) - , CalculationTimeCounter(calculationTimeCounter) + , CalculationTimeCounter(calculationTimeCounter) , SlidingWindow(TDuration::Seconds(15), 100) - , CalculationSlidingWindow(TDuration::Seconds(15), 100) + , CalculationSlidingWindow(TDuration::Seconds(15), 100) { } @@ -99,53 +99,53 @@ public: } } - ui64 MeasureTaskDurationNs() { - // Prepare worm test data - // 11 * 11 * 3 * 8 = 2904 bytes, fits in L1 cache - constexpr ui64 Size = 11; - // Align the data to reduce random alignment effects - alignas(64) TStackVec<ui64, Size * Size * 3> data; - ui64 s = 0; - NHPTimer::STime beginTime; - NHPTimer::STime endTime; - // Prepare the data - data.resize(Size * Size * 3); - for (ui64 matrixIdx = 0; matrixIdx < 3; ++matrixIdx) { - for (ui64 y = 0; y < Size; ++y) { - for (ui64 x = 0; x < Size; ++x) { - data[matrixIdx * (Size * Size) + y * Size + x] = y * Size + x; - } - } - } - // Warm-up the cache - NHPTimer::GetTime(&beginTime); - for (ui64 idx = 0; idx < data.size(); ++idx) { - s += data[idx]; - } - NHPTimer::GetTime(&endTime); - s += (ui64)(1000000.0 * NHPTimer::GetSeconds(endTime - beginTime)); - - // Measure the CPU performance - // C = A * B with injected dependency to s - NHPTimer::GetTime(&beginTime); - for (ui64 y = 0; y < Size; ++y) { - for (ui64 x = 0; x < Size; ++x) { - for (ui64 i = 0; i < Size; ++i) { - s += data[y * Size + i] * data[Size * Size + i * Size + x]; - } - data[2 * Size * Size + y * Size + x] = s; - s = 0; - } - } - for (ui64 idx = 0; idx < data.size(); ++idx) { - s += data[idx]; - } - NHPTimer::GetTime(&endTime); - // Prepare the result - double d = 1000000000.0 * (NHPTimer::GetSeconds(endTime - beginTime) + 0.000000001 * (s & 1)); - return (ui64)d; - } - + ui64 MeasureTaskDurationNs() { + // Prepare worm test data + // 11 * 11 * 3 * 8 = 2904 bytes, fits in L1 cache + constexpr ui64 Size = 11; + // Align the data to reduce random alignment effects + alignas(64) TStackVec<ui64, Size * Size * 3> data; + ui64 s = 0; + NHPTimer::STime beginTime; + NHPTimer::STime endTime; + // Prepare the data + data.resize(Size * Size * 3); + for (ui64 matrixIdx = 0; matrixIdx < 3; ++matrixIdx) { + for (ui64 y = 0; y < Size; ++y) { + for (ui64 x = 0; x < Size; ++x) { + data[matrixIdx * (Size * Size) + y * Size + x] = y * Size + x; + } + } + } + // Warm-up the cache + NHPTimer::GetTime(&beginTime); + for (ui64 idx = 0; idx < data.size(); ++idx) { + s += data[idx]; + } + NHPTimer::GetTime(&endTime); + s += (ui64)(1000000.0 * NHPTimer::GetSeconds(endTime - beginTime)); + + // Measure the CPU performance + // C = A * B with injected dependency to s + NHPTimer::GetTime(&beginTime); + for (ui64 y = 0; y < Size; ++y) { + for (ui64 x = 0; x < Size; ++x) { + for (ui64 i = 0; i < Size; ++i) { + s += data[y * Size + i] * data[Size * Size + i * Size + x]; + } + data[2 * Size * Size + y * Size + x] = s; + s = 0; + } + } + for (ui64 idx = 0; idx < data.size(); ++idx) { + s += data[idx]; + } + NHPTimer::GetTime(&endTime); + // Prepare the result + double d = 1000000000.0 * (NHPTimer::GetSeconds(endTime - beginTime) + 0.000000001 * (s & 1)); + return (ui64)d; + } + void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx) { const auto now = ctx.Now(); @@ -156,10 +156,10 @@ public: *Counter = SlidingWindow.Update(delayUs, now); - ui64 d = MeasureTaskDurationNs(); + ui64 d = MeasureTaskDurationNs(); auto res = CalculationSlidingWindow.Update({1, d}, now); - *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); - + *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); + SchedulePing(ctx, hpNow); } @@ -174,10 +174,10 @@ private: IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, - const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) + const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter) { - return new TSelfPingActor(sendInterval, counter, calculationTimeCounter); + return new TSelfPingActor(sendInterval, counter, calculationTimeCounter); } } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor.h b/library/cpp/actors/helpers/selfping_actor.h index e26bd13599..d7d07f9fa8 100644 --- a/library/cpp/actors/helpers/selfping_actor.h +++ b/library/cpp/actors/helpers/selfping_actor.h @@ -7,7 +7,7 @@ namespace NActors { NActors::IActor* CreateSelfPingActor( TDuration sendInterval, - const NMonitoring::TDynamicCounters::TCounterPtr& counter, - const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter); + const NMonitoring::TDynamicCounters::TCounterPtr& counter, + const NMonitoring::TDynamicCounters::TCounterPtr& calculationTimeCounter); } // NActors diff --git a/library/cpp/actors/helpers/selfping_actor_ut.cpp b/library/cpp/actors/helpers/selfping_actor_ut.cpp index aaf23cad24..459635fa24 100644 --- a/library/cpp/actors/helpers/selfping_actor_ut.cpp +++ b/library/cpp/actors/helpers/selfping_actor_ut.cpp @@ -21,14 +21,14 @@ Y_UNIT_TEST_SUITE(TSelfPingTest) { //const TActorId sender = runtime.AllocateEdgeActor(); NMonitoring::TDynamicCounters::TCounterPtr counter(new NMonitoring::TCounterForPtr()); - NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); + NMonitoring::TDynamicCounters::TCounterPtr counter2(new NMonitoring::TCounterForPtr()); auto actor = CreateSelfPingActor( TDuration::MilliSeconds(100), // sendInterval (unused in test) - counter, counter2); + counter, counter2); UNIT_ASSERT_VALUES_EQUAL(counter->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); + UNIT_ASSERT_VALUES_EQUAL(counter2->Val(), 0); const TActorId actorId = runtime->Register(actor); Y_UNUSED(actorId); diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 25c882dd3a..36c6855d93 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -226,7 +226,7 @@ TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingR request->Host, request->URL.Before('?'), response ? response->Status : "504", - TDuration::Seconds(std::abs(request->Timer.Passed())) + TDuration::Seconds(std::abs(request->Timer.Passed())) ); } @@ -236,7 +236,7 @@ TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingR request->Host, request->URL.Before('?'), response->Status, - TDuration::Seconds(std::abs(request->Timer.Passed())) + TDuration::Seconds(std::abs(request->Timer.Passed())) ); } diff --git a/library/cpp/actors/interconnect/interconnect_counters.cpp b/library/cpp/actors/interconnect/interconnect_counters.cpp index fc57a43dfb..224160d4b4 100644 --- a/library/cpp/actors/interconnect/interconnect_counters.cpp +++ b/library/cpp/actors/interconnect/interconnect_counters.cpp @@ -232,7 +232,7 @@ namespace { if (name != std::exchange(HumanFriendlyPeerHostName, name)) { PerSessionCounters.Reset(); } - VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); + VALGRIND_MAKE_READABLE(&DataCenterId, sizeof(DataCenterId)); if (dataCenterId != std::exchange(DataCenterId, dataCenterId)) { PerDataCenterCounters.Reset(); } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp index 015031f615..59e7dda810 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp +++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp @@ -88,7 +88,7 @@ namespace NInterconnect { TPollerUnit::IdleThread(void* param) { // TODO: musl-libc version of `sched_param` struct is for some reason different from pthread // version in Ubuntu 12.04 -#if defined(_linux_) && !defined(_musl_) +#if defined(_linux_) && !defined(_musl_) pthread_t threadSelf = pthread_self(); sched_param sparam = {20}; pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam); diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h index 780bdb4322..48851de2c5 100644 --- a/library/cpp/actors/interconnect/ut/lib/interrupter.h +++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h @@ -119,7 +119,7 @@ private: bool DelayTraffic; void UpdateRejectingState() { - if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) { + if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) { RejectingStateTimer.Reset(); CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2)); RejectingTraffic = !RejectingTraffic; @@ -127,7 +127,7 @@ private: } void RandomlyDisconnect() { - if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) { + if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) { DisconnectTimer.Reset(); if (RandomNumber<ui32>(100) > 90) { if (!Connections.empty()) { diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index f246a23811..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -214,10 +214,10 @@ namespace NActors { return Scheduled.begin()->Deadline; } - ui64 TEventMailBox::GetSentEventCount() const { - return Sent.size(); - } - + ui64 TEventMailBox::GetSentEventCount() const { + return Sent.size(); + } + class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: TTimeProvider(TTestActorRuntimeBase& runtime) @@ -238,9 +238,9 @@ namespace NActors { TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) - { + { Y_UNUSED(Runtime); - } + } void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); @@ -333,9 +333,9 @@ namespace NActors { if (verbose) Cerr << "Event was added to scheduled queue\n"; } else { - if (cookie) { - cookie->Detach(); - } + if (cookie) { + cookie->Detach(); + } if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; } @@ -368,22 +368,22 @@ namespace NActors { if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); - if (ev->GetRecipientRewrite() == logger) { - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); - if (recipientActor) { + if (ev->GetRecipientRewrite() == logger) { + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); + if (recipientActor) { TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite()); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; - recipientActor->Receive(ev, ctx); + recipientActor->Receive(ev, ctx); TlsActivationContext = prevTlsActivationContext; // we expect the logger to never die in tests - } - } - } else { - Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); - Runtime->MailboxesHasEvents.Signal(); - } + } + } + } else { + Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); + Runtime->MailboxesHasEvents.Signal(); + } if (verbose) Cerr << "Event was added to sent queue\n"; } else { @@ -458,7 +458,7 @@ namespace NActors { TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) : ScheduledCount(0) , ScheduledLimit(100000) - , MainThreadId(TThread::CurrentThreadId()) + , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) , FirstNodeId(NextNodeId) , NodeCount(nodeCount) @@ -481,7 +481,7 @@ namespace NActors { , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { - SetDispatcherRandomSeed(TInstant::Now(), 0); + SetDispatcherRandomSeed(TInstant::Now(), 0); EnableActorCallstack(); } @@ -541,7 +541,7 @@ namespace NActors { Cerr.Flush(); Clog.Flush(); - DisableActorCallstack(); + DisableActorCallstack(); } void TTestActorRuntimeBase::CleanupNodes() { @@ -580,10 +580,10 @@ namespace NActors { void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { - if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { - runtime.ScheduleWhiteList.insert(actorId); + if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { + runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; - } + } } class TScheduledTreeItem { @@ -644,8 +644,8 @@ namespace NActors { auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); eventTypes[std::make_pair(item.Event->Recipient, name)]++; - runtime.ScheduledCount++; - if (runtime.ScheduledCount > runtime.ScheduledLimit) { + runtime.ScheduledCount++; + if (runtime.ScheduledCount > runtime.ScheduledLimit) { // TScheduledTreeItem root("Root"); // TVector<TString> path; // for (const auto& pr : eventTypes) { @@ -665,10 +665,10 @@ namespace NActors { // } // root.RecursiveSort(); // root.Print(Cerr); - - ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); - } - if (item.Cookie->Get()) { + + ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); + } + if (item.Cookie->Get()) { if (item.Cookie->Detach()) { queue.push_back(item.Event); } @@ -808,8 +808,8 @@ namespace NActors { if (newTime.MicroSeconds() > CurrentTimestamp) { CurrentTimestamp = newTime.MicroSeconds(); for (auto& kv : Nodes) { - AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); - AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); + AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); + AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); } } } @@ -1125,42 +1125,42 @@ namespace NActors { } TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; - while (!currentMailboxes.empty()) { + while (!currentMailboxes.empty()) { bool hasProgress = true; while (hasProgress) { ++DispatchCyclesCount; hasProgress = false; - ui64 eventsToDispatch = 0; - for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { - if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - eventsToDispatch += mboxIt->second->GetSentEventCount(); - } - } - ui32 eventsDispatched = 0; - - //TODO: count events before each cycle, break after dispatching that much events - bool isEmpty = false; - while (!isEmpty && eventsDispatched < eventsToDispatch) { - ui64 mailboxCount = currentMailboxes.size(); + ui64 eventsToDispatch = 0; + for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { + if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + eventsToDispatch += mboxIt->second->GetSentEventCount(); + } + } + ui32 eventsDispatched = 0; + + //TODO: count events before each cycle, break after dispatching that much events + bool isEmpty = false; + while (!isEmpty && eventsDispatched < eventsToDispatch) { + ui64 mailboxCount = currentMailboxes.size(); ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; - auto startWithMboxIt = currentMailboxes.begin(); - for (ui64 i = 0; i < startWith; ++i) { - ++startWithMboxIt; - } - auto endWithMboxIt = startWithMboxIt; - - isEmpty = true; - auto mboxIt = startWithMboxIt; + auto startWithMboxIt = currentMailboxes.begin(); + for (ui64 i = 0; i < startWith; ++i) { + ++startWithMboxIt; + } + auto endWithMboxIt = startWithMboxIt; + + isEmpty = true; + auto mboxIt = startWithMboxIt; TDeque<TEventMailboxId> suspectedBoxes; - while (true) { - auto& mbox = *mboxIt; - bool isIgnored = true; - if (!mbox.second->IsEmpty()) { - HandleNonEmptyMailboxesForEachContext(mbox.first); - if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - - bool isEdgeMailbox = false; + while (true) { + auto& mbox = *mboxIt; + bool isIgnored = true; + if (!mbox.second->IsEmpty()) { + HandleNonEmptyMailboxesForEachContext(mbox.first); + if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + + bool isEdgeMailbox = false; if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) { isEdgeMailbox = true; TEventsList events; @@ -1168,80 +1168,80 @@ namespace NActors { for (auto& ev : events) { TInverseGuard<TMutex> inverseGuard(Mutex); ObserverFunc(*this, ev); - } + } mbox.second->PushFront(events); } - - if (!isEdgeMailbox) { - isEmpty = false; - isIgnored = false; - ++eventsDispatched; - ++DispatchedEventsCount; - if (DispatchedEventsCount > DispatchedEventsLimit) { - ythrow TWithBackTrace<yexception>() << "Dispatched " - << DispatchedEventsLimit << " events, limit reached."; - } - - auto ev = mbox.second->Pop(); + + if (!isEdgeMailbox) { + isEmpty = false; + isIgnored = false; + ++eventsDispatched; + ++DispatchedEventsCount; + if (DispatchedEventsCount > DispatchedEventsLimit) { + ythrow TWithBackTrace<yexception>() << "Dispatched " + << DispatchedEventsLimit << " events, limit reached."; + } + + auto ev = mbox.second->Pop(); if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) { //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10)); if (verbose) { Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(ev, this); } - } - - hasProgress = true; - EEventAction action; - { - TInverseGuard<TMutex> inverseGuard(Mutex); - action = ObserverFunc(*this, ev); - } - - switch (action) { - case EEventAction::PROCESS: - UpdateFinalEventsStatsForEachContext(*ev); - SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); - break; - case EEventAction::DROP: - // do nothing - break; - case EEventAction::RESCHEDULE: { - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; - mbox.second->Freeze(deadline); - mbox.second->PushFront(ev); - break; - } - default: + } + + hasProgress = true; + EEventAction action; + { + TInverseGuard<TMutex> inverseGuard(Mutex); + action = ObserverFunc(*this, ev); + } + + switch (action) { + case EEventAction::PROCESS: + UpdateFinalEventsStatsForEachContext(*ev); + SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); + break; + case EEventAction::DROP: + // do nothing + break; + case EEventAction::RESCHEDULE: { + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; + mbox.second->Freeze(deadline); + mbox.second->PushFront(ev); + break; + } + default: Y_FAIL("Unknown action"); - } - } + } + } } - } + } Y_VERIFY(mboxIt != currentMailboxes.end()); - if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && - mboxIt->second->IsEmpty() && - mboxIt->second->IsScheduledEmpty() && - mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - suspectedBoxes.push_back(mboxIt->first); + if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && + mboxIt->second->IsEmpty() && + mboxIt->second->IsScheduledEmpty() && + mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + suspectedBoxes.push_back(mboxIt->first); } - ++mboxIt; - if (mboxIt == currentMailboxes.end()) { - mboxIt = currentMailboxes.begin(); + ++mboxIt; + if (mboxIt == currentMailboxes.end()) { + mboxIt = currentMailboxes.begin(); } Y_VERIFY(endWithMboxIt != currentMailboxes.end()); - if (mboxIt == endWithMboxIt) { + if (mboxIt == endWithMboxIt) { break; } } - for (auto id : suspectedBoxes) { - auto it = currentMailboxes.find(id); - if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && - it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - currentMailboxes.erase(it); - } + for (auto id : suspectedBoxes) { + auto it = currentMailboxes.find(id); + if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && + it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + currentMailboxes.erase(it); + } } } } @@ -1283,7 +1283,7 @@ namespace NActors { ythrow TWithBackTrace<TEmptyEventQueueException>(); } - if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { + if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; bool isEmpty = true; TMaybe<TInstant> nearestMailboxDeadline; @@ -1385,7 +1385,7 @@ namespace NActors { void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, - senderNodeIndex, NodeCount); + senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } @@ -1475,11 +1475,11 @@ namespace NActors { } void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { - ui64 days = (time.Hours() / 24); - DispatcherRandomSeed = (days << 32) ^ iteration; + ui64 days = (time.Hours() / 24); + DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); - } - + } + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { TGuard<TMutex> guard(Mutex); if (nodeIndex == Max<ui32>()) { @@ -1569,24 +1569,24 @@ namespace NActors { TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(recipientLocalId); if (recipientActor) { - // Save actorId by value in order to prevent ctx from being invalidated during another Send call. + // Save actorId by value in order to prevent ctx from being invalidated during another Send call. TActorId actorId = ev->GetRecipientRewrite(); node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); - TActivationContext *prevTlsActivationContext = TlsActivationContext; + TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; CurrentRecipient = actorId; { TInverseGuard<TMutex> inverseGuard(Mutex); -#ifdef USE_ACTOR_CALLSTACK - TCallstack::GetTlsCallstack() = ev->Callstack; - TCallstack::GetTlsCallstack().SetLinesToSkip(); -#endif +#ifdef USE_ACTOR_CALLSTACK + TCallstack::GetTlsCallstack() = ev->Callstack; + TCallstack::GetTlsCallstack().SetLinesToSkip(); +#endif recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); } CurrentRecipient = TActorId(); - TlsActivationContext = prevTlsActivationContext; + TlsActivationContext = prevTlsActivationContext; } else { if (VERBOSE) { Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n"; @@ -1831,7 +1831,7 @@ namespace NActors { } else { while (Context->Queue->Head()) { HasReply = false; - ctx.ExecutorThread.Send(GetForwardedEvent().Release()); + ctx.ExecutorThread.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { try { diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index f41c4889ad..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -100,13 +100,13 @@ namespace NActors { struct TScheduledEventQueueItem { TInstant Deadline; TAutoPtr<IEventHandle> Event; - TAutoPtr<TSchedulerCookieHolder> Cookie; + TAutoPtr<TSchedulerCookieHolder> Cookie; ui64 UniqueId; TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) : Deadline(deadline) , Event(event) - , Cookie(new TSchedulerCookieHolder(cookie)) + , Cookie(new TSchedulerCookieHolder(cookie)) , UniqueId(++NextUniqueId) {} @@ -151,7 +151,7 @@ namespace NActors { void Schedule(const TScheduledEventQueueItem& item); bool IsScheduledEmpty() const; TInstant GetFirstScheduleDeadline() const; - ui64 GetSentEventCount() const; + ui64 GetSentEventCount() const; private: TScheduledEventsList Scheduled; @@ -173,15 +173,15 @@ namespace NActors { } }; - class TSchedulingLimitReachedException : public yexception { - public: - TSchedulingLimitReachedException(ui64 limit) { - TStringStream str; - str << "TestActorRuntime Processed over " << limit << " events."; - Append(str.Str()); - } - }; - + class TSchedulingLimitReachedException : public yexception { + public: + TSchedulingLimitReachedException(ui64 limit) { + TStringStream str; + str << "TestActorRuntime Processed over " << limit << " events."; + Append(str.Str()); + } + }; + class TTestActorRuntimeBase: public TNonCopyable { public: class TEdgeActor; @@ -222,9 +222,9 @@ namespace NActors { static bool IsVerbose(); static void SetVerbose(bool verbose); TDuration SetDispatchTimeout(TDuration timeout); - void SetDispatchedEventsLimit(ui64 limit) { - DispatchedEventsLimit = limit; - } + void SetDispatchedEventsLimit(ui64 limit) { + DispatchedEventsLimit = limit; + } TDuration SetReschedulingDelay(TDuration delay); void SetLogBackend(const TAutoPtr<TLogBackend> logBackend); void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); @@ -399,15 +399,15 @@ namespace NActors { return {}; } - template <typename TEvent> + template <typename TEvent> TEvent* GrabEdgeEventRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { - try { + try { return GrabEdgeEvent<TEvent>(handle, simTimeout); - } catch (...) { + } catch (...) { ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); - } - } - + } + } + template<class TEvent> typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { try { @@ -453,17 +453,17 @@ namespace NActors { } } - void ResetScheduledCount() { - ScheduledCount = 0; - } - - void SetScheduledLimit(ui64 limit) { - ScheduledLimit = limit; - } - - void SetDispatcherRandomSeed(TInstant time, ui64 iteration); + void ResetScheduledCount() { + ScheduledCount = 0; + } + + void SetScheduledLimit(ui64 limit) { + ScheduledLimit = limit; + } + + void SetDispatcherRandomSeed(TInstant time, ui64 iteration); TString GetActorName(const TActorId& actorId) const; - + const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; } void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; } @@ -502,8 +502,8 @@ namespace NActors { bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); private: - ui64 ScheduledCount; - ui64 ScheduledLimit; + ui64 ScheduledCount; + ui64 ScheduledLimit; THolder<TTempDir> TmpDir; const TThread::TId MainThreadId; @@ -525,9 +525,9 @@ namespace NActors { TMap<ui32, ui64> EvCounters; ui64 DispatchCyclesCount; ui64 DispatchedEventsCount; - ui64 DispatchedEventsLimit = 2'500'000; + ui64 DispatchedEventsLimit = 2'500'000; TActorId CurrentRecipient; - ui64 DispatcherRandomSeed; + ui64 DispatcherRandomSeed; TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; TAutoPtr<TLogBackend> LogBackend; bool NeedMonitoring; diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index 45267f9e99..f5595efbaa 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -53,11 +53,11 @@ public: free(ptr); } - void operator delete(void* p, void* ptr) { - Y_UNUSED(p); - Y_UNUSED(ptr); - } - + void operator delete(void* p, void* ptr) { + Y_UNUSED(p); + Y_UNUSED(ptr); + } + TData GetData() const override { return {Data + Offset, Size}; } @@ -237,7 +237,7 @@ class TRope { case EType::STRING: return wrapper(reinterpret_cast<TString&>(value)); case EType::ROPE_CHUNK_BACKEND: return wrapper(reinterpret_cast<IRopeChunkBackend::TPtr&>(value)); } - Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type)); + Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type)); } template<typename TCallback> diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h index ac1e1e4b2b..76f036c0cf 100644 --- a/library/cpp/actors/util/unordered_cache.h +++ b/library/cpp/actors/util/unordered_cache.h @@ -126,18 +126,18 @@ public: } } - ~TUnorderedCache() { + ~TUnorderedCache() { Y_VERIFY(!Pop(0)); - for (ui64 i = 0; i < Concurrency; ++i) { + for (ui64 i = 0; i < Concurrency; ++i) { if (ReadSlots[i].ReadFrom) { delete ReadSlots[i].ReadFrom; ReadSlots[i].ReadFrom = nullptr; - } + } WriteSlots[i].WriteTo = nullptr; - } - } - + } + } + T Pop(ui64 readerRotation) noexcept { ui64 readerIndex = readerRotation; const ui64 endIndex = readerIndex + Concurrency; diff --git a/library/cpp/actors/util/ut/ya.make b/library/cpp/actors/util/ut/ya.make index 8cb52a067e..3b08b77984 100644 --- a/library/cpp/actors/util/ut/ya.make +++ b/library/cpp/actors/util/ut/ya.make @@ -1,10 +1,10 @@ UNITTEST_FOR(library/cpp/actors/util) -IF (WITH_VALGRIND) - TIMEOUT(600) - SIZE(MEDIUM) -ENDIF() - +IF (WITH_VALGRIND) + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + OWNER( alexvru g:kikimr diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h index e641b654c7..3d92ef8450 100644 --- a/library/cpp/bucket_quoter/bucket_quoter.h +++ b/library/cpp/bucket_quoter/bucket_quoter.h @@ -82,12 +82,12 @@ public: /* fixed quota */ TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, - StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) + StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) : MsgPassed(msgPassed) , BucketUnderflows(bucketUnderflows) , TokensUsed(tokensUsed) , UsecWaited(usecWaited) - , AggregateInflow(aggregateInflow) + , AggregateInflow(aggregateInflow) , Bucket(fill ? capacity : 0) , LastAdd(Timer::Now()) , InflowTokensPerSecond(&FixedInflow) @@ -101,12 +101,12 @@ public: /* adjustable quotas */ TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, - StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) + StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) : MsgPassed(msgPassed) , BucketUnderflows(bucketUnderflows) , TokensUsed(tokensUsed) , UsecWaited(usecWaited) - , AggregateInflow(aggregateInflow) + , AggregateInflow(aggregateInflow) , Bucket(fill ? AtomicGet(*capacity) : 0) , LastAdd(Timer::Now()) , InflowTokensPerSecond(inflow) @@ -231,11 +231,11 @@ private: ui64 elapsed = Timer::Duration(LastAdd, now); if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { - ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; - if (AggregateInflow) { - *AggregateInflow += inflow; - } - Bucket += inflow; + ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; + if (AggregateInflow) { + *AggregateInflow += inflow; + } + Bucket += inflow; if (Bucket > *BucketTokensCapacity) { Bucket = *BucketTokensCapacity; } @@ -267,7 +267,7 @@ private: StatCounter* BucketUnderflows; StatCounter* TokensUsed; StatCounter* UsecWaited; - StatCounter* AggregateInflow; + StatCounter* AggregateInflow; i64 Bucket; TTime LastAdd; diff --git a/library/cpp/lwtrace/check.h b/library/cpp/lwtrace/check.h index 04d0dc7fd0..71503cbc7b 100644 --- a/library/cpp/lwtrace/check.h +++ b/library/cpp/lwtrace/check.h @@ -74,5 +74,5 @@ namespace NLWTrace { --Value; } }; - + } diff --git a/library/cpp/lwtrace/control.h b/library/cpp/lwtrace/control.h index 6136b29daa..16b24eafd2 100644 --- a/library/cpp/lwtrace/control.h +++ b/library/cpp/lwtrace/control.h @@ -251,7 +251,7 @@ namespace NLWTrace { reader.Push(probe); } } - + template <class TReader> void ReadTraces(TReader& reader) const { TGuard<TMutex> g(Mtx); diff --git a/library/cpp/lwtrace/custom_action.cpp b/library/cpp/lwtrace/custom_action.cpp index dfc713ee9a..a379b34ec0 100644 --- a/library/cpp/lwtrace/custom_action.cpp +++ b/library/cpp/lwtrace/custom_action.cpp @@ -1,9 +1,9 @@ #include "custom_action.h" -#include "control.h" - -using namespace NLWTrace; - +#include "control.h" + +using namespace NLWTrace; + TCustomActionExecutor* TCustomActionFactory::Create(TProbe* probe, const TCustomAction& action, TSession* trace) const { auto iter = Callbacks.find(action.GetName()); if (iter != Callbacks.end()) { @@ -11,7 +11,7 @@ TCustomActionExecutor* TCustomActionFactory::Create(TProbe* probe, const TCustom } else { return nullptr; } -} +} void TCustomActionFactory::Register(const TString& name, const TCustomActionFactory::TCallback& callback) { if (Callbacks.contains(name)) { diff --git a/library/cpp/lwtrace/custom_action.h b/library/cpp/lwtrace/custom_action.h index 85445b57e5..92a3c66b84 100644 --- a/library/cpp/lwtrace/custom_action.h +++ b/library/cpp/lwtrace/custom_action.h @@ -1,16 +1,16 @@ -#pragma once - -#include "probe.h" +#pragma once + +#include "probe.h" #include <library/cpp/lwtrace/protos/lwtrace.pb.h> - + #include <util/generic/hash.h> #include <functional> namespace NLWTrace { class TSession; - + // Custom action can save any stuff (derived from IResource) in TSession object // IMPORTANT: Derived class will be used from multiple threads! (see example3) class IResource: public TAtomicRefCount<IResource> { @@ -60,7 +60,7 @@ namespace NLWTrace { return Destructive; } }; - + // Factory to produce custom action executors class TCustomActionFactory { public: diff --git a/library/cpp/lwtrace/example1/example_query.tr b/library/cpp/lwtrace/example1/example_query.tr index c898ffe035..a06e2a922d 100644 --- a/library/cpp/lwtrace/example1/example_query.tr +++ b/library/cpp/lwtrace/example1/example_query.tr @@ -1,10 +1,10 @@ -Blocks { - ProbeDesc { - Name: "IterationProbe" - Provider: "LWTRACE_EXAMPLE_PROVIDER" - } - Action { - PrintToStderrAction { } - } -} - +Blocks { + ProbeDesc { + Name: "IterationProbe" + Provider: "LWTRACE_EXAMPLE_PROVIDER" + } + Action { + PrintToStderrAction { } + } +} + diff --git a/library/cpp/lwtrace/example1/lwtrace_example1.cpp b/library/cpp/lwtrace/example1/lwtrace_example1.cpp index b196a49afb..6b32c405ee 100644 --- a/library/cpp/lwtrace/example1/lwtrace_example1.cpp +++ b/library/cpp/lwtrace/example1/lwtrace_example1.cpp @@ -1,39 +1,39 @@ #include <library/cpp/lwtrace/all.h> - + #define LWTRACE_EXAMPLE_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ - PROBE(IterationProbe, GROUPS(), TYPES(i32, double), NAMES("n", "result")) \ - /**/ - -LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) -LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) - -void InitLWTrace() { - NLWTrace::StartLwtraceFromEnv(); -} - + PROBE(IterationProbe, GROUPS(), TYPES(i32, double), NAMES("n", "result")) \ + /**/ + +LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) +LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) + +void InitLWTrace() { + NLWTrace::StartLwtraceFromEnv(); +} + long double Fact(int n) { - if (n < 0) { - ythrow yexception() << "N! is undefined for negative N (" << n << ")"; - } - double result = 1; - for (; n > 1; --n) { - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); - result *= n; - } - return result; -} - -void FactorialCalculator() { - i32 n; - Cout << "Enter a number: "; + if (n < 0) { + ythrow yexception() << "N! is undefined for negative N (" << n << ")"; + } + double result = 1; + for (; n > 1; --n) { + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); + result *= n; + } + return result; +} + +void FactorialCalculator() { + i32 n; + Cout << "Enter a number: "; TString str; - Cin >> n; - double factN = Fact(n); - Cout << n << "! = " << factN << Endl << Endl; -} - -int main() { - InitLWTrace(); - FactorialCalculator(); - return 0; -} + Cin >> n; + double factN = Fact(n); + Cout << n << "! = " << factN << Endl << Endl; +} + +int main() { + InitLWTrace(); + FactorialCalculator(); + return 0; +} diff --git a/library/cpp/lwtrace/example1/start_with_query.sh b/library/cpp/lwtrace/example1/start_with_query.sh index fd6f6cfe33..2b456d7be7 100755 --- a/library/cpp/lwtrace/example1/start_with_query.sh +++ b/library/cpp/lwtrace/example1/start_with_query.sh @@ -1,3 +1,3 @@ -#!/bin/bash -export LWTRACE="example_query.tr" -./lwtrace-example1 +#!/bin/bash +export LWTRACE="example_query.tr" +./lwtrace-example1 diff --git a/library/cpp/lwtrace/example1/ya.make b/library/cpp/lwtrace/example1/ya.make index c79f3c5408..5ae8c4a48e 100644 --- a/library/cpp/lwtrace/example1/ya.make +++ b/library/cpp/lwtrace/example1/ya.make @@ -1,13 +1,13 @@ -PROGRAM(lwtrace-example1) - -OWNER(cthulhu) - -SRCS( - lwtrace_example1.cpp -) - -PEERDIR( +PROGRAM(lwtrace-example1) + +OWNER(cthulhu) + +SRCS( + lwtrace_example1.cpp +) + +PEERDIR( library/cpp/lwtrace -) - -END() +) + +END() diff --git a/library/cpp/lwtrace/example2/destructive.tr b/library/cpp/lwtrace/example2/destructive.tr index 79bd1bb3cc..ad955db018 100644 --- a/library/cpp/lwtrace/example2/destructive.tr +++ b/library/cpp/lwtrace/example2/destructive.tr @@ -1,36 +1,36 @@ -Blocks { - ProbeDesc { Name: "IterationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - SleepAction { - NanoSeconds: 100000000 - } - } -} - -Blocks { - ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - StatementAction { - Type: ST_MOD - Argument { Variable: "nMod2" } - Argument { Param: "n" } - Argument { Value: "2" } - } - } -} -Blocks { - ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Predicate { - Operators { - Type: OT_EQ - Argument { Variable: "nMod2" } - Argument { Value: "0" } - } - } - Action { - PrintToStderrAction { } - } - Action { - KillAction { } - } -} +Blocks { + ProbeDesc { Name: "IterationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + SleepAction { + NanoSeconds: 100000000 + } + } +} + +Blocks { + ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + StatementAction { + Type: ST_MOD + Argument { Variable: "nMod2" } + Argument { Param: "n" } + Argument { Value: "2" } + } + } +} +Blocks { + ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Predicate { + Operators { + Type: OT_EQ + Argument { Variable: "nMod2" } + Argument { Value: "0" } + } + } + Action { + PrintToStderrAction { } + } + Action { + KillAction { } + } +} diff --git a/library/cpp/lwtrace/example2/example_query.tr b/library/cpp/lwtrace/example2/example_query.tr index e5fee2c0e3..31b5465860 100644 --- a/library/cpp/lwtrace/example2/example_query.tr +++ b/library/cpp/lwtrace/example2/example_query.tr @@ -1,79 +1,79 @@ -Blocks { - ProbeDesc { - Name: "StartupProbe" - Provider: "LWTRACE_EXAMPLE_PROVIDER" - } - Action { - PrintToStderrAction { } - } -} - -Blocks { - ProbeDesc { Name: "IterationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - LogAction { - LogTimestamp: true - MaxRecords: 2 - } - } -} - -Blocks { - ProbeDesc { Name: "ByrefDurationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - LogAction { - LogTimestamp: true - MaxRecords: 1 - } - } - Action { - PrintToStderrAction { } - } -} - -Blocks { - ProbeDesc { Name: "DurationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - LogAction { - LogTimestamp: true - MaxRecords: 1 - } - } - Action { - PrintToStderrAction { } - } -} - -Blocks { - ProbeDesc { Name: "ResultProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - PrintToStderrAction { } - } - Action { - LogAction { LogTimestamp: true } - } -} - - -Blocks { - ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Action { - StatementAction { - Type: ST_MOD - Argument { Variable: "nMod2" } - Argument { Param: "n" } - Argument { Value: "2" } - } - } -} -Blocks { - ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } - Predicate { - Operators { - Type: OT_EQ - Argument { Variable: "nMod2" } - Argument { Value: "0" } - } - } - Action { LogAction { } } -} +Blocks { + ProbeDesc { + Name: "StartupProbe" + Provider: "LWTRACE_EXAMPLE_PROVIDER" + } + Action { + PrintToStderrAction { } + } +} + +Blocks { + ProbeDesc { Name: "IterationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + LogAction { + LogTimestamp: true + MaxRecords: 2 + } + } +} + +Blocks { + ProbeDesc { Name: "ByrefDurationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + LogAction { + LogTimestamp: true + MaxRecords: 1 + } + } + Action { + PrintToStderrAction { } + } +} + +Blocks { + ProbeDesc { Name: "DurationProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + LogAction { + LogTimestamp: true + MaxRecords: 1 + } + } + Action { + PrintToStderrAction { } + } +} + +Blocks { + ProbeDesc { Name: "ResultProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + PrintToStderrAction { } + } + Action { + LogAction { LogTimestamp: true } + } +} + + +Blocks { + ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Action { + StatementAction { + Type: ST_MOD + Argument { Variable: "nMod2" } + Argument { Param: "n" } + Argument { Value: "2" } + } + } +} +Blocks { + ProbeDesc { Name: "AfterInputProbe" Provider: "LWTRACE_EXAMPLE_PROVIDER" } + Predicate { + Operators { + Type: OT_EQ + Argument { Variable: "nMod2" } + Argument { Value: "0" } + } + } + Action { LogAction { } } +} diff --git a/library/cpp/lwtrace/example2/lwtrace_example2.cpp b/library/cpp/lwtrace/example2/lwtrace_example2.cpp index 6ad4cc96f1..7a4f7a1daf 100644 --- a/library/cpp/lwtrace/example2/lwtrace_example2.cpp +++ b/library/cpp/lwtrace/example2/lwtrace_example2.cpp @@ -1,117 +1,117 @@ #include <library/cpp/lwtrace/control.h> #include <library/cpp/lwtrace/all.h> - + #include <library/cpp/getopt/last_getopt.h> #include <google/protobuf/text_format.h> -#include <util/stream/file.h> - +#include <util/stream/file.h> + #define LWTRACE_EXAMPLE_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ PROBE(StartupProbe, GROUPS(), TYPES(), NAMES()) \ PROBE(IterationProbe, GROUPS(), TYPES(i64, double), NAMES("n", "result")) \ - PROBE(DurationProbe, GROUPS(), TYPES(ui64, i64, double), NAMES("duration", "n", "result")) \ + PROBE(DurationProbe, GROUPS(), TYPES(ui64, i64, double), NAMES("duration", "n", "result")) \ PROBE(ResultProbe, GROUPS(), TYPES(double), NAMES("factN")) \ PROBE(AfterInputProbe, GROUPS(), TYPES(i32), NAMES("n")) \ - /**/ - -LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) -LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) - + /**/ + +LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) +LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) + THolder<NLWTrace::TManager> traceManager; - -struct TConfig { - bool UnsafeLWTrace; + +struct TConfig { + bool UnsafeLWTrace; TString TraceRequestPath; -}; - -void InitLWTrace(TConfig& cfg) { +}; + +void InitLWTrace(TConfig& cfg) { traceManager.Reset(new NLWTrace::TManager(*Singleton<NLWTrace::TProbeRegistry>(), cfg.UnsafeLWTrace)); -} - -void AddLWTraceRequest(TConfig& cfg) { +} + +void AddLWTraceRequest(TConfig& cfg) { TString queryStr = TUnbufferedFileInput(cfg.TraceRequestPath).ReadAll(); - NLWTrace::TQuery query; - google::protobuf::TextFormat::ParseFromString(queryStr, &query); - traceManager->New("TraceRequest1", query); -} - -class TLogReader { -public: + NLWTrace::TQuery query; + google::protobuf::TextFormat::ParseFromString(queryStr, &query); + traceManager->New("TraceRequest1", query); +} + +class TLogReader { +public: void Push(TThread::TId tid, const NLWTrace::TCyclicLog::TItem& item) { - Cout << "tid=" << tid << " probe=" << item.Probe->Event.Name; - if (item.Timestamp != TInstant::Zero()) { - Cout << " time=" << item.Timestamp; - } - if (item.SavedParamsCount > 0) { + Cout << "tid=" << tid << " probe=" << item.Probe->Event.Name; + if (item.Timestamp != TInstant::Zero()) { + Cout << " time=" << item.Timestamp; + } + if (item.SavedParamsCount > 0) { TString paramValues[LWTRACE_MAX_PARAMS]; - item.Probe->Event.Signature.SerializeParams(item.Params, paramValues); - Cout << " params: "; - for (size_t i = 0; i < item.SavedParamsCount; ++i) { - Cout << " " << item.Probe->Event.Signature.ParamNames[i] << "=" << paramValues[i]; - } - } - Cout << Endl; - } -}; - -void DisplayLWTraceLog() { - Cout << "LWTrace log:" << Endl; - TLogReader reader; - traceManager->ReadLog("TraceRequest1", reader); -} - + item.Probe->Event.Signature.SerializeParams(item.Params, paramValues); + Cout << " params: "; + for (size_t i = 0; i < item.SavedParamsCount; ++i) { + Cout << " " << item.Probe->Event.Signature.ParamNames[i] << "=" << paramValues[i]; + } + } + Cout << Endl; + } +}; + +void DisplayLWTraceLog() { + Cout << "LWTrace log:" << Endl; + TLogReader reader; + traceManager->ReadLog("TraceRequest1", reader); +} + long double Fact(i64 n) { - if (n < 0) { - ythrow yexception() << "N! is undefined for negative N (" << n << ")"; - } - double result = 1; - for (; n > 1; --n) { - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); - GLOBAL_LWPROBE_DURATION(LWTRACE_EXAMPLE_PROVIDER, DurationProbe, n, result); - - result *= n; - } - return result; -} - -void FactorialCalculator() { - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, StartupProbe); - - i32 n; - Cout << "Enter a number: "; + if (n < 0) { + ythrow yexception() << "N! is undefined for negative N (" << n << ")"; + } + double result = 1; + for (; n > 1; --n) { + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); + GLOBAL_LWPROBE_DURATION(LWTRACE_EXAMPLE_PROVIDER, DurationProbe, n, result); + + result *= n; + } + return result; +} + +void FactorialCalculator() { + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, StartupProbe); + + i32 n; + Cout << "Enter a number: "; TString str; - Cin >> n; - - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, AfterInputProbe, n); - - double factN = Fact(n); - Cout << n << "! = " << factN << Endl << Endl; - - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, ResultProbe, factN); -} - -int main(int argc, char** argv) { - TConfig cfg; - using namespace NLastGetopt; - TOpts opts = NLastGetopt::TOpts::Default(); - opts.AddLongOption('u', "unsafe-lwtrace", + Cin >> n; + + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, AfterInputProbe, n); + + double factN = Fact(n); + Cout << n << "! = " << factN << Endl << Endl; + + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, ResultProbe, factN); +} + +int main(int argc, char** argv) { + TConfig cfg; + using namespace NLastGetopt; + TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('u', "unsafe-lwtrace", "allow destructive LWTrace actions") .OptionalValue(ToString(true)) .DefaultValue(ToString(false)) .StoreResult(&cfg.UnsafeLWTrace); - opts.AddLongOption('f', "trace-request", + opts.AddLongOption('f', "trace-request", "specify a file containing LWTrace request") .DefaultValue("example_query.tr") .StoreResult(&cfg.TraceRequestPath); - opts.AddHelpOption('h'); - TOptsParseResult res(&opts, argc, argv); - - InitLWTrace(cfg); - - AddLWTraceRequest(cfg); - - FactorialCalculator(); - - DisplayLWTraceLog(); - - return 0; -} + opts.AddHelpOption('h'); + TOptsParseResult res(&opts, argc, argv); + + InitLWTrace(cfg); + + AddLWTraceRequest(cfg); + + FactorialCalculator(); + + DisplayLWTraceLog(); + + return 0; +} diff --git a/library/cpp/lwtrace/example2/ya.make b/library/cpp/lwtrace/example2/ya.make index a54d2a9d53..22e34239c8 100644 --- a/library/cpp/lwtrace/example2/ya.make +++ b/library/cpp/lwtrace/example2/ya.make @@ -1,14 +1,14 @@ -PROGRAM(lwtrace-example2) - -OWNER(cthulhu) - -SRCS( - lwtrace_example2.cpp -) - -PEERDIR( +PROGRAM(lwtrace-example2) + +OWNER(cthulhu) + +SRCS( + lwtrace_example2.cpp +) + +PEERDIR( library/cpp/lwtrace library/cpp/getopt -) - -END() +) + +END() diff --git a/library/cpp/lwtrace/example3/example_query.tr b/library/cpp/lwtrace/example3/example_query.tr index bd22441af3..1f841b0932 100644 --- a/library/cpp/lwtrace/example3/example_query.tr +++ b/library/cpp/lwtrace/example3/example_query.tr @@ -1,13 +1,13 @@ -Blocks { - ProbeDesc { - Name: "IterationProbe" - Provider: "LWTRACE_EXAMPLE_PROVIDER" - } - Action { +Blocks { + ProbeDesc { + Name: "IterationProbe" + Provider: "LWTRACE_EXAMPLE_PROVIDER" + } + Action { CustomAction { Name: "MyAction" Opts: "/dev/stdout" } - } -} - + } +} + diff --git a/library/cpp/lwtrace/example3/lwtrace_example3.cpp b/library/cpp/lwtrace/example3/lwtrace_example3.cpp index e4e96fbc66..4493dc0077 100644 --- a/library/cpp/lwtrace/example3/lwtrace_example3.cpp +++ b/library/cpp/lwtrace/example3/lwtrace_example3.cpp @@ -1,43 +1,43 @@ #include <library/cpp/lwtrace/all.h> #include <google/protobuf/text_format.h> #include "my_action.h" - + #define LWTRACE_EXAMPLE_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ - PROBE(IterationProbe, GROUPS(), TYPES(i32, double), NAMES("n", "result")) \ - /**/ - -LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) -LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) - -void InitLWTrace() { + PROBE(IterationProbe, GROUPS(), TYPES(i32, double), NAMES("n", "result")) \ + /**/ + +LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) +LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) + +void InitLWTrace() { NLWTrace::StartLwtraceFromEnv([=](NLWTrace::TManager& mngr) { mngr.RegisterCustomAction<TMyActionExecutor>(); }); -} - +} + long double Fact(int n) { - if (n < 0) { - ythrow yexception() << "N! is undefined for negative N (" << n << ")"; - } - double result = 1; - for (; n > 1; --n) { - GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); - result *= n; - } - return result; -} - -void FactorialCalculator() { - i32 n; - Cout << "Enter a number: "; + if (n < 0) { + ythrow yexception() << "N! is undefined for negative N (" << n << ")"; + } + double result = 1; + for (; n > 1; --n) { + GLOBAL_LWPROBE(LWTRACE_EXAMPLE_PROVIDER, IterationProbe, n, result); + result *= n; + } + return result; +} + +void FactorialCalculator() { + i32 n; + Cout << "Enter a number: "; TString str; - Cin >> n; - double factN = Fact(n); - Cout << n << "! = " << factN << Endl << Endl; -} - -int main() { - InitLWTrace(); - FactorialCalculator(); - return 0; -} + Cin >> n; + double factN = Fact(n); + Cout << n << "! = " << factN << Endl << Endl; +} + +int main() { + InitLWTrace(); + FactorialCalculator(); + return 0; +} diff --git a/library/cpp/lwtrace/example3/start_with_query.sh b/library/cpp/lwtrace/example3/start_with_query.sh index f7f58ef929..5cd221856f 100755 --- a/library/cpp/lwtrace/example3/start_with_query.sh +++ b/library/cpp/lwtrace/example3/start_with_query.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/bash echo "Executing program with following trace query:" cat example_query.tr echo -n "Press any key to start program" diff --git a/library/cpp/lwtrace/example3/ya.make b/library/cpp/lwtrace/example3/ya.make index e226b1e041..c5b31586e9 100644 --- a/library/cpp/lwtrace/example3/ya.make +++ b/library/cpp/lwtrace/example3/ya.make @@ -1,13 +1,13 @@ PROGRAM(lwtrace-example3) - + OWNER(serxa) - -SRCS( + +SRCS( lwtrace_example3.cpp -) - -PEERDIR( +) + +PEERDIR( library/cpp/lwtrace -) - -END() +) + +END() diff --git a/library/cpp/lwtrace/example4/example_query.tr b/library/cpp/lwtrace/example4/example_query.tr index 73f28b7123..46cd25ce91 100644 --- a/library/cpp/lwtrace/example4/example_query.tr +++ b/library/cpp/lwtrace/example4/example_query.tr @@ -1,10 +1,10 @@ -Blocks { - ProbeDesc { +Blocks { + ProbeDesc { Name: "BackTrack" - Provider: "LWTRACE_EXAMPLE_PROVIDER" - } - Action { - PrintToStderrAction { } - } -} - + Provider: "LWTRACE_EXAMPLE_PROVIDER" + } + Action { + PrintToStderrAction { } + } +} + diff --git a/library/cpp/lwtrace/example4/lwtrace_example4.cpp b/library/cpp/lwtrace/example4/lwtrace_example4.cpp index b2b23b1096..7b55a07c75 100644 --- a/library/cpp/lwtrace/example4/lwtrace_example4.cpp +++ b/library/cpp/lwtrace/example4/lwtrace_example4.cpp @@ -1,49 +1,49 @@ #include <library/cpp/lwtrace/all.h> - + #define LWTRACE_EXAMPLE_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ PROBE(BackTrack, GROUPS(), TYPES(NLWTrace::TSymbol), NAMES("frame")) \ - /**/ - -LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) -LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) - + /**/ + +LWTRACE_DECLARE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) +LWTRACE_DEFINE_PROVIDER(LWTRACE_EXAMPLE_PROVIDER) + LWTRACE_USING(LWTRACE_EXAMPLE_PROVIDER); #define MY_BACKTRACK() LWPROBE(BackTrack, LWTRACE_LOCATION_SYMBOL) -void InitLWTrace() { - NLWTrace::StartLwtraceFromEnv(); -} - +void InitLWTrace() { + NLWTrace::StartLwtraceFromEnv(); +} + long double Fact(int n) { MY_BACKTRACK(); - if (n < 0) { + if (n < 0) { MY_BACKTRACK(); - ythrow yexception() << "N! is undefined for negative N (" << n << ")"; - } - double result = 1; - for (; n > 1; --n) { + ythrow yexception() << "N! is undefined for negative N (" << n << ")"; + } + double result = 1; + for (; n > 1; --n) { MY_BACKTRACK(); - result *= n; - } + result *= n; + } MY_BACKTRACK(); - return result; -} - -void FactorialCalculator() { + return result; +} + +void FactorialCalculator() { MY_BACKTRACK(); - i32 n; - Cout << "Enter a number: "; + i32 n; + Cout << "Enter a number: "; TString str; - Cin >> n; - double factN = Fact(n); - Cout << n << "! = " << factN << Endl << Endl; -} - -int main() { - InitLWTrace(); + Cin >> n; + double factN = Fact(n); + Cout << n << "! = " << factN << Endl << Endl; +} + +int main() { + InitLWTrace(); MY_BACKTRACK(); - FactorialCalculator(); + FactorialCalculator(); MY_BACKTRACK(); - return 0; -} + return 0; +} diff --git a/library/cpp/lwtrace/example4/start_with_query.sh b/library/cpp/lwtrace/example4/start_with_query.sh index 09079b0a31..5bc195c1ae 100755 --- a/library/cpp/lwtrace/example4/start_with_query.sh +++ b/library/cpp/lwtrace/example4/start_with_query.sh @@ -1,3 +1,3 @@ -#!/bin/bash -export LWTRACE="example_query.tr" +#!/bin/bash +export LWTRACE="example_query.tr" ./lwtrace-example4 diff --git a/library/cpp/lwtrace/example4/ya.make b/library/cpp/lwtrace/example4/ya.make index efc9413906..a3004340a8 100644 --- a/library/cpp/lwtrace/example4/ya.make +++ b/library/cpp/lwtrace/example4/ya.make @@ -1,13 +1,13 @@ PROGRAM(lwtrace-example4) - + OWNER(serxa) - -SRCS( + +SRCS( lwtrace_example4.cpp -) - -PEERDIR( +) + +PEERDIR( library/cpp/lwtrace -) - -END() +) + +END() diff --git a/library/cpp/lwtrace/preprocessor.h b/library/cpp/lwtrace/preprocessor.h index c6da7cedd3..40865467b2 100644 --- a/library/cpp/lwtrace/preprocessor.h +++ b/library/cpp/lwtrace/preprocessor.h @@ -34,7 +34,7 @@ // 2. FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, your_p1_value, your_p1_value) // FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, your_p1_another_value, your_p1_another_value) // 3. #undef FOREACH_PARAMTYPE_MACRO -// Type order matters! +// Type order matters! #define FOREACH_PARAMTYPE(MACRO, ...) \ MACRO("i64", i64, I64, ##__VA_ARGS__) \ MACRO("ui64", ui64, Ui64, ##__VA_ARGS__) \ @@ -49,13 +49,13 @@ MACRO(NULL, TNil, Nil, ##__VA_ARGS__) \ /**/ -// Used for math statements +// Used for math statements #define FOR_MATH_PARAMTYPE(MACRO, ...) \ MACRO("i64", i64, I64, ##__VA_ARGS__) \ MACRO("ui64", ui64, Ui64, ##__VA_ARGS__) \ MACRO("check", NLWTrace::TCheck, Check, ##__VA_ARGS__) \ - /**/ - + /**/ + // Use for code generation to handle parameter lists // NOTE: this is the only place to change if more parameters needed #define FOREACH_PARAMNUM(MACRO, ...) \ @@ -78,22 +78,22 @@ MACRO(16, ##__VA_ARGS__) \ /**/ -#define FOREACH_LEFT_TYPE(MACRO, ...) \ +#define FOREACH_LEFT_TYPE(MACRO, ...) \ MACRO(__VA_ARGS__, OT_VARIABLE) \ MACRO(__VA_ARGS__, OT_LITERAL) \ MACRO(__VA_ARGS__, OT_PARAMETER) \ - /**/ - -#define FOREACH_RIGHT_TYPE(MACRO, ...) \ + /**/ + +#define FOREACH_RIGHT_TYPE(MACRO, ...) \ MACRO(__VA_ARGS__, OT_VARIABLE) \ MACRO(__VA_ARGS__, OT_LITERAL) \ MACRO(__VA_ARGS__, OT_PARAMETER) \ - /**/ - -#define FOREACH_DESTINATION_TYPE(MACRO, ...) \ + /**/ + +#define FOREACH_DESTINATION_TYPE(MACRO, ...) \ MACRO(__VA_ARGS__, OT_VARIABLE) \ - /**/ - + /**/ + // Auxilary macros #define LWTRACE_EXPAND(x) x #define LWTRACE_EAT(...) diff --git a/library/cpp/lwtrace/protos/lwtrace.proto b/library/cpp/lwtrace/protos/lwtrace.proto index 29e0fa1fe6..0051095719 100644 --- a/library/cpp/lwtrace/protos/lwtrace.proto +++ b/library/cpp/lwtrace/protos/lwtrace.proto @@ -22,15 +22,15 @@ enum EOperatorType { OT_GE = 5; } -message TArgument { +message TArgument { string Param = 1; bytes Value = 2; string Variable = 3; -} - +} + message TOperator { EOperatorType Type = 1; - repeated TArgument Argument = 8; + repeated TArgument Argument = 8; } message TPredicate { @@ -47,10 +47,10 @@ message TLogAction { message TPrintToStderrAction { } -message TKillAction { +message TKillAction { } -message TSleepAction { +message TSleepAction { uint64 NanoSeconds = 1; } @@ -59,7 +59,7 @@ message TCustomAction { repeated string Opts = 2; } -enum EStatementType { +enum EStatementType { ST_MOV = 0; ST_ADD = 1; ST_SUB = 2; @@ -70,13 +70,13 @@ enum EStatementType { ST_SUB_EQ = 7; ST_INC = 8; ST_DEC = 9; -} - -message TStatementAction { +} + +message TStatementAction { EStatementType Type = 1; - repeated TArgument Argument = 2; -} - + repeated TArgument Argument = 2; +} + message TRunLogShuttleAction { bool Ignore = 1; uint64 ShuttlesCount = 2; @@ -106,7 +106,7 @@ message TAction { message TBlock { TProbeDesc ProbeDesc = 1; TPredicate Predicate = 2; - repeated TAction Action = 3; + repeated TAction Action = 3; } message TQuery { diff --git a/library/cpp/lwtrace/sleep_action.cpp b/library/cpp/lwtrace/sleep_action.cpp index d50ab843f7..74977528db 100644 --- a/library/cpp/lwtrace/sleep_action.cpp +++ b/library/cpp/lwtrace/sleep_action.cpp @@ -1,15 +1,15 @@ #include "sleep_action.h" - -#include "control.h" -#include <util/system/datetime.h> - +#include "control.h" + +#include <util/system/datetime.h> + #include <stdlib.h> -using namespace NLWTrace; -using namespace NLWTrace::NPrivate; - +using namespace NLWTrace; +using namespace NLWTrace::NPrivate; + bool TSleepActionExecutor::DoExecute(TOrbit&, const TParams&) { - NanoSleep(NanoSeconds); - return true; -} + NanoSleep(NanoSeconds); + return true; +} diff --git a/library/cpp/lwtrace/sleep_action.h b/library/cpp/lwtrace/sleep_action.h index 868c4dd492..26f89bd88c 100644 --- a/library/cpp/lwtrace/sleep_action.h +++ b/library/cpp/lwtrace/sleep_action.h @@ -1,13 +1,13 @@ -#pragma once - -#include "probe.h" - +#pragma once + +#include "probe.h" + namespace NLWTrace { namespace NPrivate { class TSleepActionExecutor: public IExecutor { private: ui64 NanoSeconds; - + public: TSleepActionExecutor(const TProbe*, ui64 nanoSeconds) : IExecutor() @@ -16,6 +16,6 @@ namespace NLWTrace { } bool DoExecute(TOrbit& orbit, const TParams& params) override; }; - + } } diff --git a/library/cpp/lwtrace/tests/trace_tests.cpp b/library/cpp/lwtrace/tests/trace_tests.cpp index 41b44ca53a..6762e344a7 100644 --- a/library/cpp/lwtrace/tests/trace_tests.cpp +++ b/library/cpp/lwtrace/tests/trace_tests.cpp @@ -4,7 +4,7 @@ #include <google/protobuf/text_format.h> -#include <util/system/pipe.h> +#include <util/system/pipe.h> #include <util/generic/ymath.h> #include <util/string/printf.h> #include <util/string/vector.h> @@ -461,7 +461,7 @@ namespace NLWTrace { ui64 duration = (t1.NanoSeconds() - t0.NanoSeconds()); Cout << "multiple sleep tested, expected 100000000 ns, measured " << duration << " ns" << Endl; } - + void SleepCheck(const TConfig& cfg) { TProbes p(cfg.UnsafeLWTrace); p.Mngr.New("test-sleep", MakeQuery( @@ -484,7 +484,7 @@ namespace NLWTrace { ui64 duration = (t1.NanoSeconds() - t0.NanoSeconds()) / (ui64)10; Cout << "sleep tested, expected 100000000 ns, measured " << duration << " ns" << Endl; } - + void KillCheckChild(const TConfig& cfg, TPipeHandle& writer) { TProbes p(cfg.UnsafeLWTrace); p.Mngr.New("test-kill", MakeQuery( @@ -506,7 +506,7 @@ namespace NLWTrace { buffer = 1; writer.Write(&buffer, 1); } - + void KillCheckParent(TPipeHandle& reader) { char buffer = -1; reader.Read(&buffer, 1); @@ -518,9 +518,9 @@ namespace NLWTrace { else Cout << "\t\tkill executor tested OK." << Endl; } - + void KillCheck(const TConfig& cfg) { -#ifdef _unix_ +#ifdef _unix_ TPipeHandle reader; TPipeHandle writer; TPipeHandle::Pipe(reader, writer); @@ -538,11 +538,11 @@ namespace NLWTrace { KillCheckParent(reader); reader.Close(); } -#else +#else Cout << "kill action test for windows is not implemented." << Endl; -#endif +#endif } - + void LogIntModFilter(const TConfig& cfg) { TProbes p(cfg.UnsafeLWTrace); p.Mngr.New("test-trace", MakeQuery( @@ -635,7 +635,7 @@ namespace NLWTrace { "}")); Cout << "call to probe with int mod filter (always true, mod 10) and log executors: " << p.IntParamTime(cfg) << Endl; } - + #define FOR_EACH_TEST() \ FOR_EACH_TEST_MACRO(LogIntModFilter) \ FOR_EACH_TEST_MACRO(SleepCheck) \ diff --git a/library/cpp/lwtrace/trace.cpp b/library/cpp/lwtrace/trace.cpp index acacf366f1..3c974c85a0 100644 --- a/library/cpp/lwtrace/trace.cpp +++ b/library/cpp/lwtrace/trace.cpp @@ -1,8 +1,8 @@ #include "all.h" -#include "kill_action.h" +#include "kill_action.h" #include "log_shuttle.h" #include "preprocessor.h" -#include "sleep_action.h" +#include "sleep_action.h" #include "stderr_writer.h" #include "google/protobuf/repeated_field.h" @@ -74,17 +74,17 @@ namespace NLWTrace { return &traceVariables[name]; } return &((*it).second); - } - + } + typedef enum { OT_LITERAL = 0, OT_PARAMETER = 1, OT_VARIABLE = 2 } EOperandType; - + template <class T, EOperandType> class TOperand; - + template <class T> class TOperand<T, OT_LITERAL> { private: @@ -135,7 +135,7 @@ namespace NLWTrace { void Inc() { AtomicIncrement(*Variable); } - + void Dec() { AtomicDecrement(*Variable); } @@ -166,7 +166,7 @@ namespace NLWTrace { void Set(const TCheck& value) { AtomicSet(*Variable, value.Value); } - + void Add(const TCheck& value) { AtomicAdd(*Variable, value.Value); } @@ -178,7 +178,7 @@ namespace NLWTrace { void Inc() { AtomicIncrement(*Variable); } - + void Dec() { AtomicDecrement(*Variable); } @@ -225,23 +225,23 @@ namespace NLWTrace { virtual ~IOperandGetter() { } }; - + template <class T, EOperandType TParam> class TOperandGetter: public IOperandGetter<T> { private: TOperand<T, TParam> Op; - + public: TOperandGetter(const TOperand<T, TParam>& op) : Op(op) { } - + const T Get(const TParams& params) override { return Op.Get(params); } }; - + template <class T> class TReceiver: public TOperand<T, OT_VARIABLE> { public: @@ -250,7 +250,7 @@ namespace NLWTrace { { } }; - + template <class TP, class TPredicate> static bool CmpFunc(TP a, TP b) { return TPredicate()(a, b); @@ -306,26 +306,26 @@ namespace NLWTrace { private: TFunc Func; TReceiver<TP> Receiver; - + bool DoExecute(TOrbit&, const TParams&) override { Func(Receiver); return true; } - + public: TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver) : Receiver(receiver) { } }; - + template <class TP, class TFunc, EOperandType TParam> class TBinaryInplaceStatementExecutor: public IExecutor { private: TFunc Func; TReceiver<TP> Receiver; TOperand<TP, TParam> Param; - + bool DoExecute(TOrbit&, const TParams& params) override { Func(Receiver, Param.Get(params)); return true; @@ -338,14 +338,14 @@ namespace NLWTrace { { } }; - + template <class TP, class TFunc, EOperandType TFirstParam> class TBinaryStatementExecutor: public IExecutor { private: TFunc Func; TReceiver<TP> Receiver; TOperand<TP, TFirstParam> FirstParam; - + bool DoExecute(TOrbit&, const TParams& params) override { Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params))); return true; @@ -464,9 +464,9 @@ namespace NLWTrace { EOperandType Type; size_t ParamIdx; } TArgumentDescription; - + using TArgumentList = TVector<TArgumentDescription>; - + template <class T> void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) { arguments.clear(); @@ -483,10 +483,10 @@ namespace NLWTrace { operand.Type = OT_PARAMETER; operand.ParamIdx = signature.FindParamIndex(arg.GetParam()); if (operand.ParamIdx == size_t(-1)) { - ythrow yexception() << exceptionPrefix + ythrow yexception() << exceptionPrefix << " argument #" << argumentIdx << " param '" << arg.GetParam() << "' doesn't exist"; - } + } if (firstParamIdx == size_t(-1)) { firstParamIdx = operand.ParamIdx; } else { @@ -499,14 +499,14 @@ namespace NLWTrace { ythrow yexception() << exceptionPrefix << " argument #" << argumentIdx << " is empty"; - } + } arguments.push_back(operand); } if (arguments.size() != expectedArgumentCount) { - ythrow yexception() << exceptionPrefix + ythrow yexception() << exceptionPrefix << " incorrect number of arguments (" << arguments.size() << " present, " << expectedArgumentCount << " expected)"; - } + } } template <class TArg1, class TArg2> @@ -569,7 +569,7 @@ namespace NLWTrace { TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \ FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \ } - + #define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \ if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \ FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \ @@ -680,10 +680,10 @@ namespace NLWTrace { << " SleepAction missing parameter 'NanoSeconds'"; } } else { - ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 + ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " contains destructive SleepAction, but destructive actions are disabled." << " Please, consider using --unsafe-lwtrace command line parameter."; - } + } } else if (action.HasStatementAction()) { const TStatementAction& statement = action.GetStatementAction(); TString exceptionPrefix; @@ -696,24 +696,24 @@ namespace NLWTrace { expectedArgumentCount = 1; } ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments); - + TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0); TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1); TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2); - + TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable(); TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable(); TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable(); - + TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue(); TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue(); TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue(); - + const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1)) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx]; const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1)) ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx]; - + if (arg0.Type == OT_VARIABLE) { switch (statement.GetType()) { #define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \ @@ -723,7 +723,7 @@ namespace NLWTrace { actExec.Reset(new TExec(receiver)); \ break; \ } - + #define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \ if (arg1.Type == ft) { \ typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \ @@ -814,11 +814,11 @@ namespace NLWTrace { } #undef CREATE_OPERAND_GETTER_N #undef TERNARY_ON_TYPE -#undef IMPLEMENT_TERNARY_STATEMENT -#undef PARSE_TERNARY_STATEMENT_MACRO -#undef PARSE_BINARY_STATEMENT_MACRO -#undef PARSE_BINARY_INPLACE_STATEMENT_MACRO -#undef PARSE_UNARY_INPLACE_STATEMENT_MACRO +#undef IMPLEMENT_TERNARY_STATEMENT +#undef PARSE_TERNARY_STATEMENT_MACRO +#undef PARSE_BINARY_STATEMENT_MACRO +#undef PARSE_BINARY_INPLACE_STATEMENT_MACRO +#undef PARSE_UNARY_INPLACE_STATEMENT_MACRO } else { ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 << " has not supported action '" << action.ShortDebugString() << "'"; @@ -835,7 +835,7 @@ namespace NLWTrace { if (!probe->Attach(exec.Get())) { ythrow yexception() << "block #" << bi + 1 << " cannot be attached to probe '" << probe->Event.Name << "': no free slots"; - } + } Probes.push_back(std::make_pair(probe, exec.Release())); #else Y_UNUSED(bi); diff --git a/library/cpp/lwtrace/ya.make b/library/cpp/lwtrace/ya.make index 2c5f63e268..d9accb3006 100644 --- a/library/cpp/lwtrace/ya.make +++ b/library/cpp/lwtrace/ya.make @@ -15,7 +15,7 @@ SRCS( perf.cpp probes.cpp shuttle.cpp - sleep_action.cpp + sleep_action.cpp start.cpp stderr_writer.cpp symbol.cpp diff --git a/library/cpp/monlib/dynamic_counters/counters.cpp b/library/cpp/monlib/dynamic_counters/counters.cpp index 2a1ca63b67..3635d87d0d 100644 --- a/library/cpp/monlib/dynamic_counters/counters.cpp +++ b/library/cpp/monlib/dynamic_counters/counters.cpp @@ -124,7 +124,7 @@ TIntrusivePtr<TDynamicCounters> TDynamicCounters::GetSubgroup(const TString& nam res = MakeIntrusive<TDynamicCounters>(this); Counters.emplace_hint(it, key, res); } - } + } return res; } @@ -138,7 +138,7 @@ void TDynamicCounters::RemoveSubgroup(const TString& name, const TString& value) auto g = LockForUpdate("RemoveSubgroup", name, value); if (const auto it = Counters.find({name, value}); it != Counters.end() && AsDynamicCounters(it->second)) { Counters.erase(it); - } + } } void TDynamicCounters::ReplaceSubgroup(const TString& name, const TString& value, TIntrusivePtr<TDynamicCounters> subgroup) { diff --git a/library/cpp/monlib/dynamic_counters/counters.h b/library/cpp/monlib/dynamic_counters/counters.h index b5951a2f6c..dc178cfbe0 100644 --- a/library/cpp/monlib/dynamic_counters/counters.h +++ b/library/cpp/monlib/dynamic_counters/counters.h @@ -191,12 +191,12 @@ namespace NMonitoring { struct TDynamicCounters: public TCountableBase { public: using TCounterPtr = TIntrusivePtr<TCounterForPtr>; - using TOnLookupPtr = void (*)(const char *methodName, const TString &name, const TString &value); + using TOnLookupPtr = void (*)(const char *methodName, const TString &name, const TString &value); private: TRWMutex Lock; TCounterPtr LookupCounter; // Counts lookups by name - TOnLookupPtr OnLookup = nullptr; // Called on each lookup if not nullptr, intended for lightweight tracing. + TOnLookupPtr OnLookup = nullptr; // Called on each lookup if not nullptr, intended for lightweight tracing. typedef TIntrusivePtr<TCountableBase> TCountablePtr; @@ -247,11 +247,11 @@ namespace NMonitoring { LookupCounter = lookupCounter; } - void SetOnLookup(TOnLookupPtr onLookup) { + void SetOnLookup(TOnLookupPtr onLookup) { TWriteGuard g(Lock); - OnLookup = onLookup; - } - + OnLookup = onLookup; + } + TWriteGuard LockForUpdate(const char *method, const TString& name, const TString& value) { auto res = TWriteGuard(Lock); if (LookupCounter) { diff --git a/library/cpp/monlib/service/pages/index_mon_page.cpp b/library/cpp/monlib/service/pages/index_mon_page.cpp index 25832cd111..83ff8b529a 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.cpp +++ b/library/cpp/monlib/service/pages/index_mon_page.cpp @@ -84,14 +84,14 @@ void TIndexMonPage::Register(TMonPagePtr page) { } TIndexMonPage* TIndexMonPage::RegisterIndexPage(const TString& path, const TString& title) { - TGuard<TMutex> g(Mtx); - TIndexMonPage* page = VerifyDynamicCast<TIndexMonPage*>(FindPage(path)); - if (page) { - return page; - } - page = new TIndexMonPage(path, title); + TGuard<TMutex> g(Mtx); + TIndexMonPage* page = VerifyDynamicCast<TIndexMonPage*>(FindPage(path)); + if (page) { + return page; + } + page = new TIndexMonPage(path, title); Register(page); - return VerifyDynamicCast<TIndexMonPage*>(page); + return VerifyDynamicCast<TIndexMonPage*>(page); } IMonPage* TIndexMonPage::FindPage(const TString& relativePath) { diff --git a/library/cpp/sliding_window/sliding_window.h b/library/cpp/sliding_window/sliding_window.h index 5d73f2f9a9..180bdf93d0 100644 --- a/library/cpp/sliding_window/sliding_window.h +++ b/library/cpp/sliding_window/sliding_window.h @@ -119,11 +119,11 @@ namespace NSlidingWindow { class TSlidingWindow { public: using TValueType = typename TOperation::TValueType; - using TValueVector = TVector<TValueType>; - using TSizeType = typename TValueVector::size_type; + using TValueVector = TVector<TValueType>; + using TSizeType = typename TValueVector::size_type; public: - TSlidingWindow(const TDuration& length, TSizeType partsNum) + TSlidingWindow(const TDuration& length, TSizeType partsNum) : Mutex() , Buckets(partsNum, TOperation::InitialValue()) // vector of size partsNum initialized with initial value , WindowValue(TOperation::InitialValue()) |