aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/threading/future/async_semaphore_ut.cpp
blob: 9f8817ea9c1dfaef26411c2d6df03e5c279d563c (plain) (tree)












































































































































































                                                                                       
#include "async_semaphore.h"
#include "async.h"

#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>

#include <util/generic/scope.h>
#include <util/generic/vector.h>
#include <util/thread/pool.h>

using namespace NThreading;

Y_UNIT_TEST_SUITE(TSemaphoreAsync) {
    Y_UNIT_TEST(SimplyAquired) {
        const size_t MAX_IN_PROGRESS = 5;

        TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
        pool.Start(MAX_IN_PROGRESS * 2);

        TVector<TFuture<size_t>> futures;
        auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
        for (size_t i = 0; i < 100; ++i) {
            auto f = semaphore->AcquireAsync()
                .Apply([&pool, i](const auto& f) -> TFuture<size_t> {
                    return Async([i, semaphore = f.GetValue()] {
                        auto guard = semaphore->MakeAutoRelease();
                        Sleep(TDuration::MilliSeconds(100));
                        return i;
                    }, pool);
                });
            futures.push_back(f);
        }

        for (size_t i = 0; i < 100; ++i) {
            UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
        }
    }

    Y_UNIT_TEST(AutoReleasedOnException) {
        auto semaphore = TAsyncSemaphore::Make(1);

        auto lock = semaphore->AcquireAsync();
        UNIT_ASSERT(lock.HasValue());
        auto waitingLock = semaphore->AcquireAsync();
        UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());

        auto future = lock.Apply([](const auto& f) {
            auto guard = f.GetValue()->MakeAutoRelease();

            ythrow yexception() << "oops";
        });

        UNIT_ASSERT(future.HasException());
        UNIT_ASSERT(waitingLock.HasValue());
    }

    Y_UNIT_TEST(LimitsParallelism) {
        const size_t MAX_IN_PROGRESS = 5;

        TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
        pool.Start(MAX_IN_PROGRESS * 2);

        std::atomic_uint64_t inProgress = 0;

        TVector<TFuture<size_t>> futures;
        auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
        for (size_t i = 0; i < 100; ++i) {
            auto f = semaphore->AcquireAsync()
                .Apply([&, i](const auto&) -> TFuture<size_t> {
                    auto currentInProgress = inProgress.fetch_add(1) + 1;

                    UNIT_ASSERT_GT(currentInProgress, 0);
                    UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);

                    return Async([i] {
                        Sleep(TDuration::MilliSeconds(100));
                        return i;
                    }, pool);
                });
            f.IgnoreResult().Subscribe([&](const auto&) {
                auto currentInProgress = inProgress.fetch_sub(1) - 1;

                UNIT_ASSERT_GE(currentInProgress, 0);
                UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);

                semaphore->Release();
            });
            futures.push_back(f);
        }

        WaitAll(futures).Wait();

        UNIT_ASSERT_EQUAL(inProgress.load(), 0);
    }

    Y_UNIT_TEST(AcquisitionOrder) {
        const size_t MAX_IN_PROGRESS = 5;

        TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
        pool.Start(MAX_IN_PROGRESS * 2);

        std::atomic_size_t latestId = 0;

        TVector<TFuture<size_t>> futures;
        auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
        for (size_t i = 0; i < 100; ++i) {
            auto f = semaphore->AcquireAsync()
                .Apply([&](const auto& f) -> size_t {
                    auto guard = f.GetValue()->MakeAutoRelease();

                    auto currentId = latestId.fetch_add(1);

                    return currentId;
                });
            futures.push_back(f);
        }

        for (size_t i = 0; i < 100; ++i) {
            UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
        }
    }

    Y_UNIT_TEST(Cancel) {
        auto semaphore = TAsyncSemaphore::Make(1);
        auto firstLock = semaphore->AcquireAsync();
        auto canceledLock = semaphore->AcquireAsync();

        UNIT_ASSERT(firstLock.HasValue());

        UNIT_ASSERT(!canceledLock.HasValue());
        UNIT_ASSERT(!canceledLock.HasException());

        semaphore->Cancel();

        UNIT_ASSERT_EXCEPTION(canceledLock.TryRethrow(), TOperationCancelledException);

        UNIT_ASSERT_NO_EXCEPTION(firstLock.GetValue()->Release());
    }

    Y_UNIT_TEST(AcquireAfterCancel) {
        auto semaphore = TAsyncSemaphore::Make(1);

        semaphore->Cancel();

        auto lock = semaphore->AcquireAsync();

        UNIT_ASSERT_EXCEPTION(lock.TryRethrow(), TOperationCancelledException);
    }

    Y_UNIT_TEST(AutoReleaseDeferReleaseReleasesOnException) {
        auto semaphore = TAsyncSemaphore::Make(1);

        auto lock = semaphore->AcquireAsync();
        UNIT_ASSERT(lock.HasValue());
        auto waitingLock = semaphore->AcquireAsync();
        UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());

        auto asyncWork = lock.Apply([](const auto& lock) {
            lock.TryRethrow();

            ythrow yexception() << "oops";
        });

        asyncWork.Subscribe(semaphore->MakeAutoRelease().DeferRelease());

        UNIT_ASSERT(asyncWork.HasException());
        UNIT_ASSERT(waitingLock.HasValue());
    }

    Y_UNIT_TEST(AutoReleaseNotCopyable) {
        UNIT_ASSERT(!std::is_copy_constructible_v<TAsyncSemaphore::TAutoRelease>);
        UNIT_ASSERT(!std::is_copy_assignable_v<TAsyncSemaphore::TAutoRelease>);
    }
}