summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzalyalov <[email protected]>2023-06-13 18:00:35 +0300
committerzalyalov <[email protected]>2023-06-13 18:00:35 +0300
commit38186d856319088fde3eab19e04df32dc074d6e5 (patch)
tree6caf78011a422d6fd26cd0be0e7177cd79c6da4e
parent71c0420945120355ed74acf263304e731291035f (diff)
prioritised event processing in hive
initial support for prioritised event processing
-rw-r--r--ydb/core/mind/hive/hive_events.h3
-rw-r--r--ydb/core/mind/hive/hive_impl.cpp122
-rw-r--r--ydb/core/mind/hive/hive_impl.h9
-rw-r--r--ydb/core/protos/counters_hive.proto1
-rw-r--r--ydb/core/util/event_priority_queue.h73
-rw-r--r--ydb/core/util/event_priority_queue_ut.cpp81
-rw-r--r--ydb/core/util/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/util/ut/ya.make1
-rw-r--r--ydb/core/util/ya.make1
12 files changed, 288 insertions, 7 deletions
diff --git a/ydb/core/mind/hive/hive_events.h b/ydb/core/mind/hive/hive_events.h
index b846a32cbe1..73350e6e4c1 100644
--- a/ydb/core/mind/hive/hive_events.h
+++ b/ydb/core/mind/hive/hive_events.h
@@ -24,6 +24,7 @@ struct TEvPrivate {
EvProcessPendingOperations,
EvRestartComplete,
EvBalancerOut,
+ EvProcessIncomingEvent,
EvEnd
};
@@ -81,6 +82,8 @@ struct TEvPrivate {
struct TEvProcessPendingOperations : TEventLocal<TEvProcessPendingOperations, EvProcessPendingOperations> {};
struct TEvBalancerOut : TEventLocal<TEvBalancerOut, EvBalancerOut> {};
+
+ struct TEvProcessIncomingEvent : TEventLocal<TEvProcessIncomingEvent, EvProcessIncomingEvent> {};
};
} // NHive
diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp
index 89709464e92..58607765745 100644
--- a/ydb/core/mind/hive/hive_impl.cpp
+++ b/ydb/core/mind/hive/hive_impl.cpp
@@ -1566,6 +1566,13 @@ void THive::UpdateCounterBootQueueSize(ui64 bootQueueSize) {
counter.Set(bootQueueSize);
}
}
+void THive::UpdateCounterEventQueueSize(i64 eventQueueSizeDiff) {
+ if (TabletCounters != nullptr) {
+ auto& counter = TabletCounters->Simple()[NHive::COUNTER_EVENTQUEUE_SIZE];
+ auto newValue = counter.Get() + eventQueueSizeDiff;
+ counter.Set(newValue);
+ }
+}
bool THive::DomainHasNodes(const TSubDomainKey &domainKey) const {
return !DomainsView.IsEmpty(domainKey);
@@ -2495,6 +2502,11 @@ void THive::Handle(TEvHive::TEvReassignOnDecommitGroup::TPtr& ev) {
Execute(CreateReassignGroupsOnDecommit(groupId, std::move(reply)));
}
+void THive::Handle(TEvPrivate::TEvProcessIncomingEvent::TPtr&) {
+ UpdateCounterEventQueueSize(-1);
+ EventQueue.ProcessIncomingEvent();
+}
+
void THive::InitDefaultChannelBind(TChannelBind& bind) {
if (!bind.HasIOPS()) {
bind.SetIOPS(GetDefaultUnitIOPS());
@@ -2528,18 +2540,24 @@ void THive::RequestPoolsInformation() {
}
}
-STFUNC(THive::StateInit) {
+ui32 THive::GetEventPriority(IEventHandle* ev) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvInterconnect::TEvNodesInfo, Handle);
- default:
- StateInitImpl(ev, SelfId());
+ case TEvHive::EvRequestHiveInfo:
+ case TEvHive::EvRequestHiveDomainStats:
+ case TEvHive::EvRequestHiveNodeStats:
+ case TEvHive::EvRequestHiveStorageStats:
+ return 10;
+ default:
+ return 50;
}
}
-STFUNC(THive::StateWork) {
- if (ResponsivenessPinger)
- ResponsivenessPinger->OnAnyEvent();
+void THive::PushProcessIncomingEvent() {
+ Send(SelfId(), new TEvPrivate::TEvProcessIncomingEvent());
+}
+void THive::ProcessEvent(std::unique_ptr<IEventHandle> event) {
+ TAutoPtr ev = event.release();
switch (ev->GetTypeRewrite()) {
hFunc(TEvHive::TEvCreateTablet, Handle);
hFunc(TEvHive::TEvAdoptTablet, Handle);
@@ -2609,6 +2627,96 @@ STFUNC(THive::StateWork) {
hFunc(TEvHive::TEvRequestTabletOwners, Handle);
hFunc(TEvHive::TEvTabletOwnersReply, Handle);
hFunc(TEvPrivate::TEvBalancerOut, Handle);
+ }
+}
+
+void THive::EnqueueIncomingEvent(STATEFN_SIG) {
+ EventQueue.EnqueueIncomingEvent(ev);
+ UpdateCounterEventQueueSize(+1);
+}
+
+STFUNC(THive::StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvInterconnect::TEvNodesInfo, Handle);
+ default:
+ StateInitImpl(ev, SelfId());
+ }
+}
+
+STFUNC(THive::StateWork) {
+ if (ResponsivenessPinger)
+ ResponsivenessPinger->OnAnyEvent();
+
+ switch (ev->GetTypeRewrite()) {
+ fFunc(TEvHive::TEvCreateTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvAdoptTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvStopTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvBootTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvLocal::TEvStatus::EventType, EnqueueIncomingEvent);
+ fFunc(TEvLocal::TEvTabletStatus::EventType, EnqueueIncomingEvent); // from bootqueue
+ fFunc(TEvLocal::TEvRegisterNode::EventType, EnqueueIncomingEvent); // from local
+ fFunc(TEvBlobStorage::TEvControllerSelectGroupsResult::EventType, EnqueueIncomingEvent);
+ fFunc(TEvents::TEvPoisonPill::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletPipe::TEvClientConnected::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletPipe::TEvClientDestroyed::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletPipe::TEvServerConnected::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletPipe::TEvServerDisconnected::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvBootTablets::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvInitMigration::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvQueryMigration::EventType, EnqueueIncomingEvent);
+ fFunc(TEvInterconnect::TEvNodeConnected::EventType, EnqueueIncomingEvent);
+ fFunc(TEvInterconnect::TEvNodeDisconnected::EventType, EnqueueIncomingEvent);
+ fFunc(TEvInterconnect::TEvNodeInfo::EventType, EnqueueIncomingEvent);
+ fFunc(TEvInterconnect::TEvNodesInfo::EventType, EnqueueIncomingEvent);
+ fFunc(TEvents::TEvUndelivered::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvProcessBootQueue::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvPostponeProcessBootQueue::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvProcessPendingOperations::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvProcessDisconnectNode::EventType, EnqueueIncomingEvent);
+ fFunc(TEvLocal::TEvSyncTablets::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvKickTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvTabletMetrics::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletBase::TEvBlockBlobStorageResult::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTabletBase::TEvDeleteTabletResult::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvReassignTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvInitiateBlockStorage::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvDeleteTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvDeleteOwnerTablets::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestHiveInfo::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvLookupTablet::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvLookupChannelInfo::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvCutTabletHistory::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvDrainNode::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvFillNode::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvInitiateDeleteStorage::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvGetTabletStorageInfo::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvLockTabletExecution::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvUnlockTabletExecution::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvProcessTabletBalancer::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvUnlockTabletReconnectTimeout::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvInitiateTabletExternalBoot::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestHiveDomainStats::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestHiveNodeStats::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestHiveStorageStats::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvInvalidateStoragePools::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvReassignOnDecommitGroup::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestTabletIdSequence::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvResponseTabletIdSequence::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvSeizeTablets::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvSeizeTabletsReply::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvReleaseTablets::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvReleaseTabletsReply::EventType, EnqueueIncomingEvent);
+ fFunc(TEvSubDomain::TEvConfigure::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvConfigureHive::EventType, EnqueueIncomingEvent);
+ fFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType, EnqueueIncomingEvent);
+ fFunc(NConsole::TEvConsole::TEvConfigNotificationRequest::EventType, EnqueueIncomingEvent);
+ fFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::EventType, EnqueueIncomingEvent);
+ fFunc(NSysView::TEvSysView::TEvGetTabletIdsRequest::EventType, EnqueueIncomingEvent);
+ fFunc(NSysView::TEvSysView::TEvGetTabletsRequest::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvRequestTabletOwners::EventType, EnqueueIncomingEvent);
+ fFunc(TEvHive::TEvTabletOwnersReply::EventType, EnqueueIncomingEvent);
+ fFunc(TEvPrivate::TEvBalancerOut::EventType, EnqueueIncomingEvent);
+ hFunc(TEvPrivate::TEvProcessIncomingEvent, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
BLOG_W("THive::StateWork unhandled event type: " << ev->GetTypeRewrite()
diff --git a/ydb/core/mind/hive/hive_impl.h b/ydb/core/mind/hive/hive_impl.h
index db6459a14aa..ab4f00e0049 100644
--- a/ydb/core/mind/hive/hive_impl.h
+++ b/ydb/core/mind/hive/hive_impl.h
@@ -27,6 +27,7 @@
#include <ydb/core/sys_view/common/events.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/util/event_priority_queue.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -379,6 +380,7 @@ protected:
bool SpreadNeighbours = true; // spread tablets of the same object across cluster
TSequenceGenerator Sequencer;
TOwnershipKeeper Keeper;
+ TEventPriorityQueue<THive> EventQueue{*this};
struct TPendingCreateTablet {
NKikimrHive::TEvCreateTablet CreateTablet;
@@ -502,6 +504,7 @@ protected:
void Handle(NSysView::TEvSysView::TEvGetTabletsRequest::TPtr& ev);
void Handle(TEvHive::TEvRequestTabletOwners::TPtr& ev);
void Handle(TEvHive::TEvTabletOwnersReply::TPtr& ev);
+ void Handle(TEvPrivate::TEvProcessIncomingEvent::TPtr& ev);
protected:
void RestartPipeTx(ui64 tabletId);
@@ -585,6 +588,7 @@ public:
void UpdateCounterTabletsTotal(i64 tabletsTotalDiff);
void UpdateCounterTabletsAlive(i64 tabletsAliveDiff);
void UpdateCounterBootQueueSize(ui64 bootQueueSize);
+ void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
void ProcessBootQueue();
void ProcessWaitQueue();
@@ -621,6 +625,10 @@ public:
void UpdateTabletFollowersNumber(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects);
TDuration GetBalancerCooldown() const;
+ ui32 GetEventPriority(IEventHandle* ev);
+ void PushProcessIncomingEvent();
+ void ProcessEvent(std::unique_ptr<NActors::IEventHandle> event);
+
TTabletMetricsAggregates DefaultResourceMetricsAggregates;
ui64 MetricsWindowSize = TDuration::Minutes(1).MilliSeconds();
const TTabletMetricsAggregates& GetDefaultResourceMetricsAggregates() const;
@@ -836,6 +844,7 @@ protected:
void InitDefaultChannelBind(TChannelBind& bind);
void RequestPoolsInformation();
void RequestFreeSequence();
+ void EnqueueIncomingEvent(STATEFN_SIG);
bool SeenDomain(TSubDomainKey domain);
void ResolveDomain(TSubDomainKey domain);
diff --git a/ydb/core/protos/counters_hive.proto b/ydb/core/protos/counters_hive.proto
index b60f7f61702..d0b0c3025d3 100644
--- a/ydb/core/protos/counters_hive.proto
+++ b/ydb/core/protos/counters_hive.proto
@@ -22,6 +22,7 @@ enum ESimpleCounters {
COUNTER_BALANCE_USAGE_MAX = 12 [(CounterOpts) = {Name: "BalanceUsageMax"}];
COUNTER_SEQUENCE_FREE = 13 [(CounterOpts) = {Name: "SequenceFree"}];
COUNTER_SEQUENCE_ALLOCATED = 14 [(CounterOpts) = {Name: "SequenceAllocated"}];
+ COUNTER_EVENTQUEUE_SIZE = 15 [(CounterOpts) = {Name: "EventQueueSize"}];
}
enum ECumulativeCounters {
diff --git a/ydb/core/util/event_priority_queue.h b/ydb/core/util/event_priority_queue.h
new file mode 100644
index 00000000000..eb12b5b27e8
--- /dev/null
+++ b/ydb/core/util/event_priority_queue.h
@@ -0,0 +1,73 @@
+#pragma once
+#include <library/cpp/actors/core/events.h>
+
+#include <concepts>
+#include <queue>
+
+namespace NKikimr {
+
+/*
+ * T is expected to provide the following methods:
+ *
+ * ui32 GetEventPriority(IEventHandle*);
+ * void PushProcessIncomingEvent();
+ * void PocessEvent(std::unique_ptr<IEventHandle>);
+ *
+ * This is not implemented as a concept, because the intended usage is storing a
+ * TEventPriorityQueue<TSomeActor> inside TSomeActor, and that would require
+ * checking the concept against incomplete type TSomeActor, and such checks
+ * are not allowed to use any members
+ */
+template<typename T>
+class TEventPriorityQueue {
+private:
+ class TInternalQueue {
+ private:
+ std::map<ui32, std::queue<std::unique_ptr<NActors::IEventHandle>>> Queue_;
+
+ public:
+ bool Empty() const {
+ return Queue_.empty();
+ }
+
+ void Push(ui32 priority, std::unique_ptr<NActors::IEventHandle> ev) {
+ Queue_[priority].push(std::move(ev));
+ }
+
+ std::unique_ptr<NActors::IEventHandle> Pop() {
+ Y_VERIFY(!Empty());
+ auto it = Queue_.begin();
+ auto& miniQueue = it->second;
+ Y_VERIFY(!miniQueue.empty());
+ auto ev = std::move(miniQueue.front());
+ miniQueue.pop();
+ if (miniQueue.empty()) {
+ Queue_.erase(it);
+ }
+ return ev;
+ }
+ };
+
+ TInternalQueue EventQueue_;
+ T& Actor_;
+
+public:
+ explicit TEventPriorityQueue(T& actor) : Actor_(actor) {}
+
+ void EnqueueIncomingEvent(STATEFN_SIG) {
+ const auto priority = Actor_.GetEventPriority(ev.Get());
+ if (EventQueue_.Empty()) {
+ Actor_.PushProcessIncomingEvent();
+ }
+ EventQueue_.Push(priority, std::unique_ptr<NActors::IEventHandle>(ev.Release()));
+ }
+
+ void ProcessIncomingEvent() {
+ auto ev = EventQueue_.Pop();
+ Actor_.ProcessEvent(std::move(ev));
+ if (!EventQueue_.Empty()) {
+ Actor_.PushProcessIncomingEvent();
+ }
+ }
+};
+} // NKikimr
diff --git a/ydb/core/util/event_priority_queue_ut.cpp b/ydb/core/util/event_priority_queue_ut.cpp
new file mode 100644
index 00000000000..931c0298533
--- /dev/null
+++ b/ydb/core/util/event_priority_queue_ut.cpp
@@ -0,0 +1,81 @@
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "event_priority_queue.h"
+
+namespace NKikimr {
+
+static constexpr ui32 EvPriorityTest = 0;
+
+struct TEvPriorityTest : NActors::TEventLocal<TEvPriorityTest, EvPriorityTest> {
+ ui32 Priority;
+
+ TEvPriorityTest(ui32 priority) : Priority(priority) {
+ }
+};
+
+class PriorityTester {
+private:
+ size_t EventsToProcess_ = 0;
+ std::vector<ui32> ProcessedPriorities_;
+
+public:
+ ui32 GetEventPriority(NActors::IEventHandle* ev) {
+ return ev->Get<TEvPriorityTest>()->Priority;
+ }
+
+ void PushProcessIncomingEvent() {
+ ++EventsToProcess_;
+ }
+
+ void ProcessEvent(std::unique_ptr<NActors::IEventHandle> ev) {
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvPriorityTest, Handle);
+ default:
+ Y_FAIL("Unexpected event type");
+ }
+ }
+
+ bool AllEventsDone() {
+ return EventsToProcess_ == 0;
+ }
+
+ const std::vector<ui32>& GetProcessedPriorities() {
+ return ProcessedPriorities_;
+ }
+
+ void Handle(TEvPriorityTest::TPtr& ev) {
+ ProcessedPriorities_.push_back(ev->Get()->Priority);
+ Y_VERIFY(EventsToProcess_ > 0);
+ --EventsToProcess_;
+ }
+
+ TEventPriorityQueue<PriorityTester> Queue{*this};
+};
+
+Y_UNIT_TEST_SUITE(TEventPriorityQueueTest) {
+ Y_UNIT_TEST(TestPriority) {
+ PriorityTester tester;
+ static constexpr size_t NUM_EVENTS = 100;
+ std::vector<ui32> priorities;
+ priorities.reserve(NUM_EVENTS);
+ for (size_t i = 0; i < NUM_EVENTS; i++) {
+ ui32 priority = (i * i) % 5;
+ auto ev = new TEvPriorityTest(priority);
+ auto handle = TAutoPtr(new NActors::IEventHandle({}, {}, ev));
+ tester.Queue.EnqueueIncomingEvent(handle);
+ priorities.push_back(priority);
+ }
+
+ while(!tester.AllEventsDone()) {
+ tester.Queue.ProcessIncomingEvent();
+ }
+
+ std::sort(priorities.begin(), priorities.end());
+ UNIT_ASSERT_VALUES_EQUAL(tester.GetProcessedPriorities(), priorities);
+ }
+};
+
+} // NKikimr
diff --git a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
index 6bd513ff2d9..1356cf5497c 100644
--- a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
@@ -36,6 +36,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/hazard_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
index 0ef26e5ac78..ac5e35a9043 100644
--- a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
@@ -39,6 +39,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/hazard_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
index b9fe35abc05..79ab4968846 100644
--- a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
@@ -40,6 +40,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/hazard_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
index 0cf0e8298d4..68bfb701a3f 100644
--- a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
@@ -29,6 +29,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/hazard_ut.cpp
diff --git a/ydb/core/util/ut/ya.make b/ydb/core/util/ut/ya.make
index d4982c7bb6a..f8a9e217d38 100644
--- a/ydb/core/util/ut/ya.make
+++ b/ydb/core/util/ut/ya.make
@@ -25,6 +25,7 @@ SRCS(
cache_ut.cpp
circular_queue_ut.cpp
concurrent_rw_hash_ut.cpp
+ event_priority_queue_ut.cpp
fast_tls_ut.cpp
fragmented_buffer_ut.cpp
hazard_ut.cpp
diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make
index aa26a5ddcef..9cc9ec248e9 100644
--- a/ydb/core/util/ya.make
+++ b/ydb/core/util/ya.make
@@ -12,6 +12,7 @@ SRCS(
console.h
counted_leaky_bucket.h
defs.h
+ event_priority_queue.h
failure_injection.cpp
failure_injection.h
fast_tls.cpp