aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
committertldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
commit57087fd2e94af7191eef09606ca6a395ab0fe129 (patch)
tree7c06867a9bdafb3127014d2aa1625a43857fcc0f
parent195aa2d16c6b08a6c3e72b9a3ce33b97f9104ba9 (diff)
downloadydb-57087fd2e94af7191eef09606ca6a395ab0fe129.tar.gz
[vcs][library][ydb] move TAsyncSemaphore from ydb/library to library/cpp/threading
Потребовалось для
-rw-r--r--library/cpp/threading/cancellation/operation_cancelled_exception.h11
-rw-r--r--library/cpp/threading/future/CMakeLists.txt1
-rw-r--r--library/cpp/threading/future/async_semaphore.cpp (renamed from ydb/library/yql/utils/threading/async_semaphore.cpp)29
-rw-r--r--library/cpp/threading/future/async_semaphore.h (renamed from ydb/library/yql/utils/threading/async_semaphore.h)11
-rw-r--r--library/cpp/threading/future/async_semaphore_ut.cpp174
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp2
-rw-r--r--ydb/library/yql/utils/threading/CMakeLists.txt1
7 files changed, 212 insertions, 17 deletions
diff --git a/library/cpp/threading/cancellation/operation_cancelled_exception.h b/library/cpp/threading/cancellation/operation_cancelled_exception.h
new file mode 100644
index 0000000000..4315fa0a6e
--- /dev/null
+++ b/library/cpp/threading/cancellation/operation_cancelled_exception.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+namespace NThreading {
+
+//! The exception to be thrown when an operation has been cancelled
+class TOperationCancelledException : public yexception {
+};
+
+}
diff --git a/library/cpp/threading/future/CMakeLists.txt b/library/cpp/threading/future/CMakeLists.txt
index 47e4c0a859..83daaa0c85 100644
--- a/library/cpp/threading/future/CMakeLists.txt
+++ b/library/cpp/threading/future/CMakeLists.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-future PUBLIC
yutil
)
target_sources(cpp-threading-future PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async_semaphore.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/future.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/fwd.cpp
diff --git a/ydb/library/yql/utils/threading/async_semaphore.cpp b/library/cpp/threading/future/async_semaphore.cpp
index 1b5cbd32ca..b73b7f3b66 100644
--- a/ydb/library/yql/utils/threading/async_semaphore.cpp
+++ b/library/cpp/threading/future/async_semaphore.cpp
@@ -3,7 +3,9 @@
#include <util/system/guard.h>
#include <util/system/yassert.h>
-namespace NYql {
+#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
+
+namespace NThreading {
TAsyncSemaphore::TAsyncSemaphore(size_t count)
: Count_(count)
@@ -15,21 +17,28 @@ TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
return TPtr(new TAsyncSemaphore(count));
}
-NThreading::TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
+TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
with_lock(Lock_) {
+ if (Cancelled_) {
+ return MakeErrorFuture<TPtr>(
+ std::make_exception_ptr(TOperationCancelledException()));
+ }
if (Count_) {
--Count_;
- return NThreading::MakeFuture<TAsyncSemaphore::TPtr>(this);
+ return MakeFuture<TAsyncSemaphore::TPtr>(this);
}
- auto promise = NThreading::NewPromise<TAsyncSemaphore::TPtr>();
+ auto promise = NewPromise<TAsyncSemaphore::TPtr>();
Promises_.push_back(promise);
return promise.GetFuture();
}
}
void TAsyncSemaphore::Release() {
- NThreading::TPromise<TPtr> promise;
+ TPromise<TPtr> promise;
with_lock(Lock_) {
+ if (Cancelled_) {
+ return;
+ }
if (Promises_.empty()) {
++Count_;
return;
@@ -42,12 +51,13 @@ void TAsyncSemaphore::Release() {
}
void TAsyncSemaphore::Cancel() {
- std::list<NThreading::TPromise<TPtr>> promises;
+ std::list<TPromise<TPtr>> promises;
with_lock(Lock_) {
+ Cancelled_ = true;
std::swap(Promises_, promises);
}
for (auto& p: promises) {
- p.SetException("Cancelled");
+ p.SetException(std::make_exception_ptr(TOperationCancelledException()));
}
}
@@ -57,9 +67,8 @@ TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
}
}
-std::function<void (const NThreading::TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
- return [s = std::move(this->Sem)](const NThreading::TFuture<void>& f) {
- f.GetValue();
+std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
+ return [s = std::move(this->Sem)](const TFuture<void>&) {
s->Release();
};
}
diff --git a/ydb/library/yql/utils/threading/async_semaphore.h b/library/cpp/threading/future/async_semaphore.h
index db0a97d66c..fa4cabad3c 100644
--- a/ydb/library/yql/utils/threading/async_semaphore.h
+++ b/library/cpp/threading/future/async_semaphore.h
@@ -8,7 +8,7 @@
#include <list>
#include <functional>
-namespace NYql {
+namespace NThreading {
class TAsyncSemaphore: public TThrRefBase {
public:
@@ -26,7 +26,7 @@ public:
}
~TAutoRelease();
- std::function<void (const NThreading::TFuture<void>&)> DeferRelease();
+ std::function<void (const TFuture<void>&)> DeferRelease();
private:
TAsyncSemaphore::TPtr Sem;
@@ -34,7 +34,7 @@ public:
static TPtr Make(size_t count);
- NThreading::TFuture<TPtr> AcquireAsync();
+ TFuture<TPtr> AcquireAsync();
void Release();
void Cancel();
@@ -47,8 +47,9 @@ private:
private:
size_t Count_;
+ bool Cancelled_ = false;
TAdaptiveLock Lock_;
- std::list<NThreading::TPromise<TPtr>> Promises_;
+ std::list<TPromise<TPtr>> Promises_;
};
-} // NYql
+} // namespace NThreading
diff --git a/library/cpp/threading/future/async_semaphore_ut.cpp b/library/cpp/threading/future/async_semaphore_ut.cpp
new file mode 100644
index 0000000000..9f8817ea9c
--- /dev/null
+++ b/library/cpp/threading/future/async_semaphore_ut.cpp
@@ -0,0 +1,174 @@
+#include "async_semaphore.h"
+#include "async.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
+
+#include <util/generic/scope.h>
+#include <util/generic/vector.h>
+#include <util/thread/pool.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TSemaphoreAsync) {
+ Y_UNIT_TEST(SimplyAquired) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&pool, i](const auto& f) -> TFuture<size_t> {
+ return Async([i, semaphore = f.GetValue()] {
+ auto guard = semaphore->MakeAutoRelease();
+ Sleep(TDuration::MilliSeconds(100));
+ return i;
+ }, pool);
+ });
+ futures.push_back(f);
+ }
+
+ for (size_t i = 0; i < 100; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
+ }
+ }
+
+ Y_UNIT_TEST(AutoReleasedOnException) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ auto lock = semaphore->AcquireAsync();
+ UNIT_ASSERT(lock.HasValue());
+ auto waitingLock = semaphore->AcquireAsync();
+ UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
+
+ auto future = lock.Apply([](const auto& f) {
+ auto guard = f.GetValue()->MakeAutoRelease();
+
+ ythrow yexception() << "oops";
+ });
+
+ UNIT_ASSERT(future.HasException());
+ UNIT_ASSERT(waitingLock.HasValue());
+ }
+
+ Y_UNIT_TEST(LimitsParallelism) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ std::atomic_uint64_t inProgress = 0;
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&, i](const auto&) -> TFuture<size_t> {
+ auto currentInProgress = inProgress.fetch_add(1) + 1;
+
+ UNIT_ASSERT_GT(currentInProgress, 0);
+ UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
+
+ return Async([i] {
+ Sleep(TDuration::MilliSeconds(100));
+ return i;
+ }, pool);
+ });
+ f.IgnoreResult().Subscribe([&](const auto&) {
+ auto currentInProgress = inProgress.fetch_sub(1) - 1;
+
+ UNIT_ASSERT_GE(currentInProgress, 0);
+ UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
+
+ semaphore->Release();
+ });
+ futures.push_back(f);
+ }
+
+ WaitAll(futures).Wait();
+
+ UNIT_ASSERT_EQUAL(inProgress.load(), 0);
+ }
+
+ Y_UNIT_TEST(AcquisitionOrder) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ std::atomic_size_t latestId = 0;
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&](const auto& f) -> size_t {
+ auto guard = f.GetValue()->MakeAutoRelease();
+
+ auto currentId = latestId.fetch_add(1);
+
+ return currentId;
+ });
+ futures.push_back(f);
+ }
+
+ for (size_t i = 0; i < 100; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
+ }
+ }
+
+ Y_UNIT_TEST(Cancel) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+ auto firstLock = semaphore->AcquireAsync();
+ auto canceledLock = semaphore->AcquireAsync();
+
+ UNIT_ASSERT(firstLock.HasValue());
+
+ UNIT_ASSERT(!canceledLock.HasValue());
+ UNIT_ASSERT(!canceledLock.HasException());
+
+ semaphore->Cancel();
+
+ UNIT_ASSERT_EXCEPTION(canceledLock.TryRethrow(), TOperationCancelledException);
+
+ UNIT_ASSERT_NO_EXCEPTION(firstLock.GetValue()->Release());
+ }
+
+ Y_UNIT_TEST(AcquireAfterCancel) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ semaphore->Cancel();
+
+ auto lock = semaphore->AcquireAsync();
+
+ UNIT_ASSERT_EXCEPTION(lock.TryRethrow(), TOperationCancelledException);
+ }
+
+ Y_UNIT_TEST(AutoReleaseDeferReleaseReleasesOnException) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ auto lock = semaphore->AcquireAsync();
+ UNIT_ASSERT(lock.HasValue());
+ auto waitingLock = semaphore->AcquireAsync();
+ UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
+
+ auto asyncWork = lock.Apply([](const auto& lock) {
+ lock.TryRethrow();
+
+ ythrow yexception() << "oops";
+ });
+
+ asyncWork.Subscribe(semaphore->MakeAutoRelease().DeferRelease());
+
+ UNIT_ASSERT(asyncWork.HasException());
+ UNIT_ASSERT(waitingLock.HasValue());
+ }
+
+ Y_UNIT_TEST(AutoReleaseNotCopyable) {
+ UNIT_ASSERT(!std::is_copy_constructible_v<TAsyncSemaphore::TAutoRelease>);
+ UNIT_ASSERT(!std::is_copy_assignable_v<TAsyncSemaphore::TAutoRelease>);
+ }
+}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
index 24e7b12bfb..38be367bb1 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
@@ -3,7 +3,6 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/utils/threading/async_semaphore.h>
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -12,6 +11,7 @@
#ifdef THROW
#undef THROW
#endif
+#include <library/cpp/threading/future/async_semaphore.h>
#include <library/cpp/xml/document/xml-document.h>
#include <util/string/builder.h>
diff --git a/ydb/library/yql/utils/threading/CMakeLists.txt b/ydb/library/yql/utils/threading/CMakeLists.txt
index 117ec898ba..114014399d 100644
--- a/ydb/library/yql/utils/threading/CMakeLists.txt
+++ b/ydb/library/yql/utils/threading/CMakeLists.txt
@@ -13,6 +13,5 @@ target_link_libraries(yql-utils-threading PUBLIC
yutil
)
target_sources(yql-utils-threading PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_semaphore.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_queue.cpp
)