aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-25 00:51:45 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-25 00:51:45 +0000
commit1e9023462e1ddd414924722bd2b37f05787a287a (patch)
tree3f7f5d8e6e2414d595d383553c27736e00e428a6 /library/cpp
parentb10a177dfbd54929bd4e0bf1f34eca1bb846412b (diff)
parented45bc6ca0a1102bde32e57fcf2255630a0866cc (diff)
downloadydb-1e9023462e1ddd414924722bd2b37f05787a287a.tar.gz
Merge branch 'rightlib' into merge-libs-250325-0050
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler.cpp56
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler.h16
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler_ut.cpp161
3 files changed, 169 insertions, 64 deletions
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp
index 174dde4bf7..b1d90d24db 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp
@@ -3,11 +3,38 @@
#include <util/system/thread.h>
#include <util/string/cast.h>
#include <util/stream/output.h>
+#include <util/generic/yexception.h>
TTaskScheduler::ITask::~ITask() {}
TTaskScheduler::IRepeatedTask::~IRepeatedTask() {}
+class TFunctionTask final : public TTaskScheduler::ITask {
+public:
+ explicit TFunctionTask(std::function<TInstant()> function)
+ : Function_{std::move(function)}
+ {}
+
+ TInstant Process() override {
+ return Function_();
+ }
+private:
+ std::function<TInstant()> Function_;
+};
+
+class TRepeatedFunctionTask final : public TTaskScheduler::IRepeatedTask {
+public:
+ explicit TRepeatedFunctionTask(std::function<bool()> function)
+ : Function_{std::move(function)}
+ {}
+
+ bool Process() override {
+ return Function_();
+ }
+
+private:
+ std::function<bool()> Function_;
+};
class TTaskScheduler::TWorkerThread
: public ISimpleThread
@@ -147,6 +174,35 @@ bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) {
return Add(t, deadline);
}
+static void ThrowOnTaskLimitReached(bool sucessfullyScheduled) {
+ if (!sucessfullyScheduled) {
+ throw TTaskScheduler::TTaskSchedulerTaskLimitReached{} << "Failed to schedule task";
+ }
+}
+
+bool TTaskScheduler::AddFunc(std::function<TInstant()> function, TInstant expire) {
+ return Add(MakeIntrusive<TFunctionTask>(std::move(function)), expire);
+}
+
+bool TTaskScheduler::AddRepeatedFunc(std::function<bool()> function, TDuration period) {
+ return Add(MakeIntrusive<TRepeatedFunctionTask>(std::move(function)), period);
+}
+
+void TTaskScheduler::SafeAdd(ITaskRef task, TInstant expire) {
+ ThrowOnTaskLimitReached(Add(std::move(task), expire));
+}
+
+void TTaskScheduler::SafeAdd(IRepeatedTaskRef task, TDuration period) {
+ ThrowOnTaskLimitReached(Add(std::move(task), period));
+}
+
+void TTaskScheduler::SafeAddFunc(std::function<TInstant()> function, TInstant expire) {
+ ThrowOnTaskLimitReached(AddFunc(std::move(function), expire));
+}
+
+void TTaskScheduler::SafeAddRepeatedFunc(std::function<bool()> function, TDuration period) {
+ ThrowOnTaskLimitReached(AddRepeatedFunc(std::move(function), period));
+}
const bool debugOutput = false;
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h
index cff057ae43..4a65ea54ca 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler.h
+++ b/library/cpp/threading/task_scheduler/task_scheduler.h
@@ -11,6 +11,8 @@
#include <util/system/condvar.h>
#include <util/system/mutex.h>
+#include <functional>
+
class TTaskScheduler {
public:
class ITask;
@@ -18,6 +20,9 @@ public:
class IRepeatedTask;
using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>;
+
+ class TTaskSchedulerTaskLimitReached: public yexception {};
+
public:
explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>());
~TTaskScheduler();
@@ -27,8 +32,17 @@ public:
bool Add(ITaskRef task, TInstant expire);
bool Add(IRepeatedTaskRef task, TDuration period);
+ [[nodiscard]] bool AddFunc(std::function<TInstant()> function, TInstant expire);
+ [[nodiscard]] bool AddRepeatedFunc(std::function<bool()> function, TDuration period);
+
+ // Safe versions that throw yexception when task limit is reached
+ void SafeAdd(ITaskRef task, TInstant expire);
+ void SafeAdd(IRepeatedTaskRef task, TDuration period);
+ void SafeAddFunc(std::function<TInstant()> function, TInstant expire);
+ void SafeAddRepeatedFunc(std::function<bool()> function, TDuration period);
size_t GetTaskCount() const;
+
private:
class TWorkerThread;
@@ -44,12 +58,14 @@ private:
using TQueueType = TMultiMap<TInstant, TTaskHolder>;
using TQueueIterator = TQueueType::iterator;
+
private:
void ChangeDebugState(TWorkerThread* thread, const TString& state);
void ChooseFromQueue(TQueueIterator& toWait);
bool Wait(TWorkerThread* thread, TQueueIterator& toWait);
void WorkerFunc(TWorkerThread* thread);
+
private:
bool IsStopped_ = false;
diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
index 637f5d6285..7761aca398 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
@@ -1,86 +1,119 @@
-#include <algorithm>
-#include <library/cpp/testing/unittest/registar.h>
+#include "task_scheduler.h"
-#include <util/stream/output.h>
#include <library/cpp/deprecated/atomic/atomic.h>
+#include <library/cpp/testing/unittest/registar.h>
+
#include <util/generic/vector.h>
+#include <util/stream/output.h>
+#include <util/system/thread.h>
+
+Y_UNIT_TEST_SUITE(TaskSchedulerTest) {
+ class TCheckTask: public TTaskScheduler::IRepeatedTask {
+ public:
+ TCheckTask(const TDuration& delay)
+ : Start_(Now())
+ , Delay_(delay)
+ {
+ AtomicIncrement(ScheduledTaskCounter_);
+ }
+
+ ~TCheckTask() override {
+ }
+
+ bool Process() override {
+ const TDuration delay = Now() - Start_;
+
+ if (delay < Delay_) {
+ AtomicIncrement(BadTimeoutCounter_);
+ }
-#include "task_scheduler.h"
+ AtomicIncrement(ExecutedTaskCounter_);
-class TTaskSchedulerTest: public TTestBase {
- UNIT_TEST_SUITE(TTaskSchedulerTest);
- UNIT_TEST(Test);
- UNIT_TEST_SUITE_END();
-
- class TCheckTask: public TTaskScheduler::IRepeatedTask {
- public:
- TCheckTask(const TDuration& delay)
- : Start_(Now())
- , Delay_(delay)
- {
- AtomicIncrement(ScheduledTaskCounter_);
- }
+ return false;
+ }
- ~TCheckTask() override {
- }
+ static bool AllTaskExecuted() {
+ return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_);
+ }
- bool Process() override {
- const TDuration delay = Now() - Start_;
+ static size_t BadTimeoutCount() {
+ return AtomicGet(BadTimeoutCounter_);
+ }
- if (delay < Delay_) {
- AtomicIncrement(BadTimeoutCounter_);
- }
+ private:
+ TInstant Start_;
+ TDuration Delay_;
+ static inline TAtomic BadTimeoutCounter_ = 0;
+ static inline TAtomic ScheduledTaskCounter_ = 0;
+ static inline TAtomic ExecutedTaskCounter_ = 0;
+ };
- AtomicIncrement(ExecutedTaskCounter_);
+ void ScheduleCheckTask(TTaskScheduler& scheduler, size_t delay) {
+ TDuration d = TDuration::MicroSeconds(delay);
- return false;
- }
+ scheduler.Add(new TCheckTask(d), d);
+ }
- static bool AllTaskExecuted() {
- return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_);
- }
+ Y_UNIT_TEST(RepeatedTasks) {
+ TTaskScheduler scheduler;
- static size_t BadTimeoutCount() {
- return AtomicGet(BadTimeoutCounter_);
- }
+ ScheduleCheckTask(scheduler, 200);
+ ScheduleCheckTask(scheduler, 100);
+ ScheduleCheckTask(scheduler, 1000);
+ ScheduleCheckTask(scheduler, 10000);
+ ScheduleCheckTask(scheduler, 5000);
+
+ scheduler.Start();
+
+ usleep(1000000);
+
+ UNIT_ASSERT_EQUAL(TCheckTask::BadTimeoutCount(), 0);
+ UNIT_ASSERT(TCheckTask::AllTaskExecuted());
+ }
+
+ Y_UNIT_TEST(FunctionWrappers) {
+ TTaskScheduler scheduler;
- private:
- TInstant Start_;
- TDuration Delay_;
- static TAtomic BadTimeoutCounter_;
- static TAtomic ScheduledTaskCounter_;
- static TAtomic ExecutedTaskCounter_;
- };
+ std::atomic<size_t> oneshotCount = 0;
+ std::atomic<size_t> repeatedCount = 0;
- public:
- inline void Test() {
- ScheduleCheckTask(200);
- ScheduleCheckTask(100);
- ScheduleCheckTask(1000);
- ScheduleCheckTask(10000);
- ScheduleCheckTask(5000);
+ scheduler.SafeAddFunc([&, now = Now()]() {
+ Y_ABORT_UNLESS(Now() - now < TDuration::MilliSeconds(300));
+ if (oneshotCount.fetch_add(1) == 0) {
+ return Now() + TDuration::MilliSeconds(100);
+ } else {
+ return TInstant::Max();
+ }
+ }, Now() + TDuration::MilliSeconds(100));
- Scheduler_.Start();
+ scheduler.SafeAddRepeatedFunc([&repeatedCount, now = Now()]() mutable -> bool {
+ TDuration delta = Now() - now;
+ Y_ABORT_UNLESS(delta > TDuration::MilliSeconds(50));
+ Y_ABORT_UNLESS(delta < TDuration::MilliSeconds(150));
+ now += delta;
+ return repeatedCount.fetch_add(1) < 3;
+ }, TDuration::MilliSeconds(100));
- usleep(1000000);
+ scheduler.Start();
- UNIT_ASSERT_EQUAL(TCheckTask::BadTimeoutCount(), 0);
- UNIT_ASSERT(TCheckTask::AllTaskExecuted());
- }
+ Sleep(TDuration::Seconds(2));
- private:
- void ScheduleCheckTask(size_t delay) {
- TDuration d = TDuration::MicroSeconds(delay);
+ UNIT_ASSERT_EQUAL(oneshotCount.load(), 2);
+ UNIT_ASSERT_EQUAL(repeatedCount.load(), 4);
- Scheduler_.Add(new TCheckTask(d), d);
- }
+ scheduler.Stop();
+ }
- private:
- TTaskScheduler Scheduler_;
-};
+ Y_UNIT_TEST(TaskLimit) {
+ TTaskScheduler scheduler{1, 2};
+ scheduler.Start();
-TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0;
-TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0;
-TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0;
+ auto function = [] { return TInstant::Max(); };
+ TInstant expire = Now() + TDuration::MilliSeconds(100);
-UNIT_TEST_SUITE_REGISTRATION(TTaskSchedulerTest);
+ UNIT_ASSERT(scheduler.AddFunc(function, expire));
+ UNIT_ASSERT_NO_EXCEPTION(scheduler.SafeAddFunc(function, expire));
+ UNIT_ASSERT(!scheduler.AddFunc(function, expire));
+ UNIT_ASSERT_EXCEPTION(scheduler.SafeAddFunc(function, expire), TTaskScheduler::TTaskSchedulerTaskLimitReached);
+ }
+}