diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-29 14:55:30 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-07-29 15:09:48 +0300 |
commit | 80adf9ab238b01ed48926dca38bf7eefd39c823d (patch) | |
tree | 0565154984fed73dc32aaf76c402f81c54ac46f6 /library/cpp/threading/future | |
parent | 5d639bd83a0f3642a1c2b605bcac23a25a071920 (diff) | |
download | ydb-80adf9ab238b01ed48926dca38bf7eefd39c823d.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp/threading/future')
-rw-r--r-- | library/cpp/threading/future/README.md | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/benchmark/coroutine_traits.cpp | 82 | ||||
-rw-r--r-- | library/cpp/threading/future/benchmark/ya.make | 11 | ||||
-rw-r--r-- | library/cpp/threading/future/core/coroutine_traits.h | 99 | ||||
-rw-r--r-- | library/cpp/threading/future/ya.make | 1 |
5 files changed, 195 insertions, 0 deletions
diff --git a/library/cpp/threading/future/README.md b/library/cpp/threading/future/README.md new file mode 100644 index 0000000000..9b9929c822 --- /dev/null +++ b/library/cpp/threading/future/README.md @@ -0,0 +1,2 @@ +See: +https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency diff --git a/library/cpp/threading/future/benchmark/coroutine_traits.cpp b/library/cpp/threading/future/benchmark/coroutine_traits.cpp new file mode 100644 index 0000000000..93528bfac0 --- /dev/null +++ b/library/cpp/threading/future/benchmark/coroutine_traits.cpp @@ -0,0 +1,82 @@ +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/core/coroutine_traits.h> + +#include <benchmark/benchmark.h> + +class TContext { +public: + TContext() + : NextInputPromise_(NThreading::NewPromise<bool>()) + {} + ~TContext() { + UpdateNextInput(false); + } + + NThreading::TFuture<bool> NextInput() { + return NextInputPromise_.GetFuture(); + } + + void UpdateNextInput(bool hasInput = true) { + auto prevNextInputPromise = NextInputPromise_; + NextInputPromise_ = NThreading::NewPromise<bool>(); + prevNextInputPromise.SetValue(hasInput); + } + +private: + NThreading::TPromise<bool> NextInputPromise_; +}; + +static void TestPureFutureChainSubscribe(benchmark::State& state) { + TContext context; + size_t cnt = 0; + std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) { + if (hasInput.GetValue()) { + benchmark::DoNotOptimize(++cnt); + context.NextInput().Subscribe(processInput); + } + }; + + processInput(NThreading::MakeFuture<bool>(true)); + for (auto _ : state) { + context.UpdateNextInput(); + } + context.UpdateNextInput(false); +} + +static void TestPureFutureChainApply(benchmark::State& state) { + TContext context; + size_t cnt = 0; + std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) { + if (hasInput.GetValue()) { + benchmark::DoNotOptimize(++cnt); + context.NextInput().Apply(processInput); + } + }; + + processInput(NThreading::MakeFuture<bool>(true)); + for (auto _ : state) { + context.UpdateNextInput(); + } + context.UpdateNextInput(false); +} + +static void TestCoroFutureChain(benchmark::State& state) { + TContext context; + size_t cnt = 0; + auto coroutine = [&context, &cnt]() -> NThreading::TFuture<void> { + while (co_await context.NextInput()) { + benchmark::DoNotOptimize(++cnt); + } + }; + + auto coroutineFuture = coroutine(); + for (auto _ : state) { + context.UpdateNextInput(); + } + context.UpdateNextInput(false); + coroutineFuture.GetValueSync(); +} + +BENCHMARK(TestPureFutureChainSubscribe); +BENCHMARK(TestPureFutureChainApply); +BENCHMARK(TestCoroFutureChain); diff --git a/library/cpp/threading/future/benchmark/ya.make b/library/cpp/threading/future/benchmark/ya.make new file mode 100644 index 0000000000..f71f52acbb --- /dev/null +++ b/library/cpp/threading/future/benchmark/ya.make @@ -0,0 +1,11 @@ +G_BENCHMARK() + +PEERDIR( + library/cpp/threading/future +) + +SRCS( + coroutine_traits.cpp +) + +END() diff --git a/library/cpp/threading/future/core/coroutine_traits.h b/library/cpp/threading/future/core/coroutine_traits.h new file mode 100644 index 0000000000..5871e22a35 --- /dev/null +++ b/library/cpp/threading/future/core/coroutine_traits.h @@ -0,0 +1,99 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +#include <coroutine> + +template<typename... Args> +struct std::coroutine_traits<NThreading::TFuture<void>, Args...> { + struct promise_type { + + NThreading::TFuture<void> get_return_object() { + return Promise_.GetFuture(); + } + + std::suspend_never initial_suspend() { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + + void unhandled_exception() { + Promise_.SetException(std::current_exception()); + } + + void return_void() { + Promise_.SetValue(); + } + + private: + NThreading::TPromise<void> Promise_ = NThreading::NewPromise(); + }; +}; + +template<typename T, typename... Args> +struct std::coroutine_traits<NThreading::TFuture<T>, Args...> { + struct promise_type { + NThreading::TFuture<T> get_return_object() { + return Promise_.GetFuture(); + } + + std::suspend_never initial_suspend() { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + + void unhandled_exception() { + Promise_.SetException(std::current_exception()); + } + + void return_value(auto&& val) { + Promise_.SetValue(std::forward<decltype(val)>(val)); + } + + private: + NThreading::TPromise<T> Promise_ = NThreading::NewPromise<T>(); + }; +}; + +template<typename T> +struct TFutureAwaitable { + NThreading::TFuture<T> Future; + + TFutureAwaitable(NThreading::TFuture<T> future) + : Future{future} + { + } + + bool await_ready() const noexcept { + return Future.HasValue() || Future.HasException(); + } + + void await_suspend(auto h) noexcept { + /* + * This library assumes that resume never throws an exception. + * This assumption is made due to the fact that the users of these library in most cases do not need to write their own coroutine handlers, + * and all coroutine handlers provided by the library do not throw exception from resume. + * + * WARNING: do not change subscribe to apply or something other here, creating an extra future state degrades performance. + */ + Future.NoexceptSubscribe( + [h](auto) mutable noexcept { + h(); + } + ); + } + + decltype(auto) await_resume() { + return Future.GetValue(); + } +}; + +template<typename T> +auto operator co_await(const NThreading::TFuture<T>& future) { + return TFutureAwaitable{future}; +} + +namespace NThreading { + + template<typename T> + auto AsAwaitable(const NThreading::TFuture<T>& fut) { + return operator co_await(fut); + } + +} // namespace NThreading diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make index bc52ca5c18..1d98569906 100644 --- a/library/cpp/threading/future/ya.make +++ b/library/cpp/threading/future/ya.make @@ -15,6 +15,7 @@ SRCS( END() RECURSE( + benchmark mt_ut perf ut |