diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-25 00:51:45 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-25 00:51:45 +0000 |
commit | 1e9023462e1ddd414924722bd2b37f05787a287a (patch) | |
tree | 3f7f5d8e6e2414d595d383553c27736e00e428a6 /library/cpp | |
parent | b10a177dfbd54929bd4e0bf1f34eca1bb846412b (diff) | |
parent | ed45bc6ca0a1102bde32e57fcf2255630a0866cc (diff) | |
download | ydb-1e9023462e1ddd414924722bd2b37f05787a287a.tar.gz |
Merge branch 'rightlib' into merge-libs-250325-0050
Diffstat (limited to 'library/cpp')
3 files changed, 169 insertions, 64 deletions
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp index 174dde4bf7..b1d90d24db 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp @@ -3,11 +3,38 @@ #include <util/system/thread.h> #include <util/string/cast.h> #include <util/stream/output.h> +#include <util/generic/yexception.h> TTaskScheduler::ITask::~ITask() {} TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} +class TFunctionTask final : public TTaskScheduler::ITask { +public: + explicit TFunctionTask(std::function<TInstant()> function) + : Function_{std::move(function)} + {} + + TInstant Process() override { + return Function_(); + } +private: + std::function<TInstant()> Function_; +}; + +class TRepeatedFunctionTask final : public TTaskScheduler::IRepeatedTask { +public: + explicit TRepeatedFunctionTask(std::function<bool()> function) + : Function_{std::move(function)} + {} + + bool Process() override { + return Function_(); + } + +private: + std::function<bool()> Function_; +}; class TTaskScheduler::TWorkerThread : public ISimpleThread @@ -147,6 +174,35 @@ bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { return Add(t, deadline); } +static void ThrowOnTaskLimitReached(bool sucessfullyScheduled) { + if (!sucessfullyScheduled) { + throw TTaskScheduler::TTaskSchedulerTaskLimitReached{} << "Failed to schedule task"; + } +} + +bool TTaskScheduler::AddFunc(std::function<TInstant()> function, TInstant expire) { + return Add(MakeIntrusive<TFunctionTask>(std::move(function)), expire); +} + +bool TTaskScheduler::AddRepeatedFunc(std::function<bool()> function, TDuration period) { + return Add(MakeIntrusive<TRepeatedFunctionTask>(std::move(function)), period); +} + +void TTaskScheduler::SafeAdd(ITaskRef task, TInstant expire) { + ThrowOnTaskLimitReached(Add(std::move(task), expire)); +} + +void TTaskScheduler::SafeAdd(IRepeatedTaskRef task, TDuration period) { + ThrowOnTaskLimitReached(Add(std::move(task), period)); +} + +void TTaskScheduler::SafeAddFunc(std::function<TInstant()> function, TInstant expire) { + ThrowOnTaskLimitReached(AddFunc(std::move(function), expire)); +} + +void TTaskScheduler::SafeAddRepeatedFunc(std::function<bool()> function, TDuration period) { + ThrowOnTaskLimitReached(AddRepeatedFunc(std::move(function), period)); +} const bool debugOutput = false; diff --git a/library/cpp/threading/task_scheduler/task_scheduler.h b/library/cpp/threading/task_scheduler/task_scheduler.h index cff057ae43..4a65ea54ca 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.h +++ b/library/cpp/threading/task_scheduler/task_scheduler.h @@ -11,6 +11,8 @@ #include <util/system/condvar.h> #include <util/system/mutex.h> +#include <functional> + class TTaskScheduler { public: class ITask; @@ -18,6 +20,9 @@ public: class IRepeatedTask; using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>; + + class TTaskSchedulerTaskLimitReached: public yexception {}; + public: explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>()); ~TTaskScheduler(); @@ -27,8 +32,17 @@ public: bool Add(ITaskRef task, TInstant expire); bool Add(IRepeatedTaskRef task, TDuration period); + [[nodiscard]] bool AddFunc(std::function<TInstant()> function, TInstant expire); + [[nodiscard]] bool AddRepeatedFunc(std::function<bool()> function, TDuration period); + + // Safe versions that throw yexception when task limit is reached + void SafeAdd(ITaskRef task, TInstant expire); + void SafeAdd(IRepeatedTaskRef task, TDuration period); + void SafeAddFunc(std::function<TInstant()> function, TInstant expire); + void SafeAddRepeatedFunc(std::function<bool()> function, TDuration period); size_t GetTaskCount() const; + private: class TWorkerThread; @@ -44,12 +58,14 @@ private: using TQueueType = TMultiMap<TInstant, TTaskHolder>; using TQueueIterator = TQueueType::iterator; + private: void ChangeDebugState(TWorkerThread* thread, const TString& state); void ChooseFromQueue(TQueueIterator& toWait); bool Wait(TWorkerThread* thread, TQueueIterator& toWait); void WorkerFunc(TWorkerThread* thread); + private: bool IsStopped_ = false; diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index 637f5d6285..7761aca398 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -1,86 +1,119 @@ -#include <algorithm> -#include <library/cpp/testing/unittest/registar.h> +#include "task_scheduler.h" -#include <util/stream/output.h> #include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/testing/unittest/registar.h> + #include <util/generic/vector.h> +#include <util/stream/output.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(TaskSchedulerTest) { + class TCheckTask: public TTaskScheduler::IRepeatedTask { + public: + TCheckTask(const TDuration& delay) + : Start_(Now()) + , Delay_(delay) + { + AtomicIncrement(ScheduledTaskCounter_); + } + + ~TCheckTask() override { + } + + bool Process() override { + const TDuration delay = Now() - Start_; + + if (delay < Delay_) { + AtomicIncrement(BadTimeoutCounter_); + } -#include "task_scheduler.h" + AtomicIncrement(ExecutedTaskCounter_); -class TTaskSchedulerTest: public TTestBase { - UNIT_TEST_SUITE(TTaskSchedulerTest); - UNIT_TEST(Test); - UNIT_TEST_SUITE_END(); - - class TCheckTask: public TTaskScheduler::IRepeatedTask { - public: - TCheckTask(const TDuration& delay) - : Start_(Now()) - , Delay_(delay) - { - AtomicIncrement(ScheduledTaskCounter_); - } + return false; + } - ~TCheckTask() override { - } + static bool AllTaskExecuted() { + return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); + } - bool Process() override { - const TDuration delay = Now() - Start_; + static size_t BadTimeoutCount() { + return AtomicGet(BadTimeoutCounter_); + } - if (delay < Delay_) { - AtomicIncrement(BadTimeoutCounter_); - } + private: + TInstant Start_; + TDuration Delay_; + static inline TAtomic BadTimeoutCounter_ = 0; + static inline TAtomic ScheduledTaskCounter_ = 0; + static inline TAtomic ExecutedTaskCounter_ = 0; + }; - AtomicIncrement(ExecutedTaskCounter_); + void ScheduleCheckTask(TTaskScheduler& scheduler, size_t delay) { + TDuration d = TDuration::MicroSeconds(delay); - return false; - } + scheduler.Add(new TCheckTask(d), d); + } - static bool AllTaskExecuted() { - return AtomicGet(ScheduledTaskCounter_) == AtomicGet(ExecutedTaskCounter_); - } + Y_UNIT_TEST(RepeatedTasks) { + TTaskScheduler scheduler; - static size_t BadTimeoutCount() { - return AtomicGet(BadTimeoutCounter_); - } + ScheduleCheckTask(scheduler, 200); + ScheduleCheckTask(scheduler, 100); + ScheduleCheckTask(scheduler, 1000); + ScheduleCheckTask(scheduler, 10000); + ScheduleCheckTask(scheduler, 5000); + + scheduler.Start(); + + usleep(1000000); + + UNIT_ASSERT_EQUAL(TCheckTask::BadTimeoutCount(), 0); + UNIT_ASSERT(TCheckTask::AllTaskExecuted()); + } + + Y_UNIT_TEST(FunctionWrappers) { + TTaskScheduler scheduler; - private: - TInstant Start_; - TDuration Delay_; - static TAtomic BadTimeoutCounter_; - static TAtomic ScheduledTaskCounter_; - static TAtomic ExecutedTaskCounter_; - }; + std::atomic<size_t> oneshotCount = 0; + std::atomic<size_t> repeatedCount = 0; - public: - inline void Test() { - ScheduleCheckTask(200); - ScheduleCheckTask(100); - ScheduleCheckTask(1000); - ScheduleCheckTask(10000); - ScheduleCheckTask(5000); + scheduler.SafeAddFunc([&, now = Now()]() { + Y_ABORT_UNLESS(Now() - now < TDuration::MilliSeconds(300)); + if (oneshotCount.fetch_add(1) == 0) { + return Now() + TDuration::MilliSeconds(100); + } else { + return TInstant::Max(); + } + }, Now() + TDuration::MilliSeconds(100)); - Scheduler_.Start(); + scheduler.SafeAddRepeatedFunc([&repeatedCount, now = Now()]() mutable -> bool { + TDuration delta = Now() - now; + Y_ABORT_UNLESS(delta > TDuration::MilliSeconds(50)); + Y_ABORT_UNLESS(delta < TDuration::MilliSeconds(150)); + now += delta; + return repeatedCount.fetch_add(1) < 3; + }, TDuration::MilliSeconds(100)); - usleep(1000000); + scheduler.Start(); - UNIT_ASSERT_EQUAL(TCheckTask::BadTimeoutCount(), 0); - UNIT_ASSERT(TCheckTask::AllTaskExecuted()); - } + Sleep(TDuration::Seconds(2)); - private: - void ScheduleCheckTask(size_t delay) { - TDuration d = TDuration::MicroSeconds(delay); + UNIT_ASSERT_EQUAL(oneshotCount.load(), 2); + UNIT_ASSERT_EQUAL(repeatedCount.load(), 4); - Scheduler_.Add(new TCheckTask(d), d); - } + scheduler.Stop(); + } - private: - TTaskScheduler Scheduler_; -}; + Y_UNIT_TEST(TaskLimit) { + TTaskScheduler scheduler{1, 2}; + scheduler.Start(); -TAtomic TTaskSchedulerTest::TCheckTask::BadTimeoutCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ScheduledTaskCounter_ = 0; -TAtomic TTaskSchedulerTest::TCheckTask::ExecutedTaskCounter_ = 0; + auto function = [] { return TInstant::Max(); }; + TInstant expire = Now() + TDuration::MilliSeconds(100); -UNIT_TEST_SUITE_REGISTRATION(TTaskSchedulerTest); + UNIT_ASSERT(scheduler.AddFunc(function, expire)); + UNIT_ASSERT_NO_EXCEPTION(scheduler.SafeAddFunc(function, expire)); + UNIT_ASSERT(!scheduler.AddFunc(function, expire)); + UNIT_ASSERT_EXCEPTION(scheduler.SafeAddFunc(function, expire), TTaskScheduler::TTaskSchedulerTaskLimitReached); + } +} |