diff options
| author | nga <[email protected]> | 2022-02-10 16:48:09 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:48:09 +0300 | 
| commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
| tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/scheduler | |
| parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/scheduler')
| -rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.cpp | 128 | ||||
| -rw-r--r-- | library/cpp/messagebus/scheduler/scheduler.h | 12 | ||||
| -rw-r--r-- | library/cpp/messagebus/scheduler/scheduler_ut.cpp | 58 | ||||
| -rw-r--r-- | library/cpp/messagebus/scheduler/ya.make | 16 | 
4 files changed, 107 insertions, 107 deletions
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5a5fe528943..8c966da86d0 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -5,19 +5,19 @@  #include <util/generic/yexception.h>  //#include "dummy_debugger.h" - -using namespace NBus; -using namespace NBus::NPrivate; +  +using namespace NBus;  +using namespace NBus::NPrivate;   class TScheduleDeadlineCompare {  public:      bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { -        return i1->GetScheduleTime() > i2->GetScheduleTime(); +        return i1->GetScheduleTime() > i2->GetScheduleTime();       }  };  TScheduler::TScheduler() -    : StopThread(false) +    : StopThread(false)       , Thread([&] { this->SchedulerThread(); })  {  } @@ -32,13 +32,13 @@ size_t TScheduler::Size() const {  }  void TScheduler::Stop() { -    { -        TGuard<TLock> guard(Lock); +    {  +        TGuard<TLock> guard(Lock);           Y_VERIFY(!StopThread, "Scheduler already stopped"); -        StopThread = true; -        CondVar.Signal(); -    } -    Thread.Get(); +        StopThread = true;  +        CondVar.Signal();  +    }  +    Thread.Get();       if (!!NextItem) {          NextItem.Destroy(); @@ -50,70 +50,70 @@ void TScheduler::Stop() {  }  void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { -    TGuard<TLock> lock(Lock); -    if (StopThread) +    TGuard<TLock> lock(Lock);  +    if (StopThread)           return; - -    if (!!NextItem) { -        if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { -            DoSwap(i, NextItem); -        } +  +    if (!!NextItem) {  +        if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {  +            DoSwap(i, NextItem);  +        }       } - +       Items.push_back(i); -    PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); - -    FillNextItem(); - -    CondVar.Signal(); -} - -void TScheduler::FillNextItem() { -    if (!NextItem && !Items.empty()) { -        PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); -        NextItem = Items.back(); -        Items.erase(Items.end() - 1); -    } +    PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());  +  +    FillNextItem();  +  +    CondVar.Signal();   } +void TScheduler::FillNextItem() {  +    if (!NextItem && !Items.empty()) {  +        PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());  +        NextItem = Items.back();  +        Items.erase(Items.end() - 1);  +    }  +}  +   void TScheduler::SchedulerThread() { -    for (;;) { -        IScheduleItemAutoPtr current; - +    for (;;) {  +        IScheduleItemAutoPtr current;  +           { -            TGuard<TLock> guard(Lock); - -            if (StopThread) { -                break; -            } - -            if (!!NextItem) { -                CondVar.WaitD(Lock, NextItem->GetScheduleTime()); -            } else { -                CondVar.WaitI(Lock); +            TGuard<TLock> guard(Lock);  +  +            if (StopThread) {  +                break;               } - -            if (StopThread) { -                break; +  +            if (!!NextItem) {  +                CondVar.WaitD(Lock, NextItem->GetScheduleTime());  +            } else {  +                CondVar.WaitI(Lock);               } - -            // signal comes if either scheduler is to be stopped of there's work to do +  +            if (StopThread) {  +                break;  +            }  +  +            // signal comes if either scheduler is to be stopped of there's work to do               Y_VERIFY(!!NextItem, "state check"); - -            if (TInstant::Now() < NextItem->GetScheduleTime()) { -                // NextItem is updated since WaitD -                continue; -            } - +  +            if (TInstant::Now() < NextItem->GetScheduleTime()) {  +                // NextItem is updated since WaitD  +                continue;  +            }  +               current = NextItem.Release();          } - -        current->Do(); -        current.Destroy(); - -        { -            TGuard<TLock> guard(Lock); -            FillNextItem(); -        } +  +        current->Do();  +        current.Destroy();  +  +        {  +            TGuard<TLock> guard(Lock);  +            FillNextItem();  +        }       }  } diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h index afcc0de55d5..6114c3cc88b 100644 --- a/library/cpp/messagebus/scheduler/scheduler.h +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -8,7 +8,7 @@  #include <util/generic/vector.h>  #include <util/system/atomic.h>  #include <util/system/condvar.h> -#include <util/system/mutex.h> +#include <util/system/mutex.h>   #include <util/system/thread.h>  namespace NBus { @@ -18,10 +18,10 @@ namespace NBus {              inline IScheduleItem(TInstant scheduleTime) noexcept;              virtual ~IScheduleItem() {              } - +               virtual void Do() = 0;              inline TInstant GetScheduleTime() const noexcept; - +           private:              TInstant ScheduleTime;          }; @@ -50,16 +50,16 @@ namespace NBus {              TCondVar CondVar;              TObjectCounter<TScheduler> ObjectCounter; - +               bool StopThread;              NThreading::TLegacyFuture<> Thread;          }; - +           inline IScheduleItem::IScheduleItem(TInstant scheduleTime) noexcept              : ScheduleTime(scheduleTime)          {          } - +           inline TInstant IScheduleItem::GetScheduleTime() const noexcept {              return ScheduleTime;          } diff --git a/library/cpp/messagebus/scheduler/scheduler_ut.cpp b/library/cpp/messagebus/scheduler/scheduler_ut.cpp index a5ea641c108..35fcccdd29f 100644 --- a/library/cpp/messagebus/scheduler/scheduler_ut.cpp +++ b/library/cpp/messagebus/scheduler/scheduler_ut.cpp @@ -1,36 +1,36 @@  #include <library/cpp/testing/unittest/registar.h> - -#include "scheduler.h" - +  +#include "scheduler.h"  +   #include <library/cpp/messagebus/misc/test_sync.h> - -using namespace NBus; -using namespace NBus::NPrivate; - +  +using namespace NBus;  +using namespace NBus::NPrivate;  +   Y_UNIT_TEST_SUITE(TSchedulerTests) { -    struct TSimpleScheduleItem: public IScheduleItem { -        TTestSync* const TestSync; - -        TSimpleScheduleItem(TTestSync* testSync) -            : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1))) -            , TestSync(testSync) +    struct TSimpleScheduleItem: public IScheduleItem {  +        TTestSync* const TestSync;  +  +        TSimpleScheduleItem(TTestSync* testSync)  +            : IScheduleItem((TInstant::Now() + TDuration::MilliSeconds(1)))  +            , TestSync(testSync)           {          } - +           void Do() override { -            TestSync->WaitForAndIncrement(0); -        } -    }; - +            TestSync->WaitForAndIncrement(0);  +        }  +    };  +       Y_UNIT_TEST(Simple) { -        TTestSync testSync; - -        TScheduler scheduler; - -        scheduler.Schedule(new TSimpleScheduleItem(&testSync)); - -        testSync.WaitForAndIncrement(1); - -        scheduler.Stop(); -    } -} +        TTestSync testSync;  +  +        TScheduler scheduler;  +  +        scheduler.Schedule(new TSimpleScheduleItem(&testSync));  +  +        testSync.WaitForAndIncrement(1);  +  +        scheduler.Stop();  +    }  +}  diff --git a/library/cpp/messagebus/scheduler/ya.make b/library/cpp/messagebus/scheduler/ya.make index dcb7408a203..382804c4085 100644 --- a/library/cpp/messagebus/scheduler/ya.make +++ b/library/cpp/messagebus/scheduler/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - +LIBRARY()  +   OWNER(g:messagebus) - +   PEERDIR(      library/cpp/threading/future  ) -SRCS( -    scheduler.cpp -) - -END() +SRCS(  +    scheduler.cpp  +)  +  +END()   | 
