aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-07-29 14:55:30 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-07-29 15:09:48 +0300
commit80adf9ab238b01ed48926dca38bf7eefd39c823d (patch)
tree0565154984fed73dc32aaf76c402f81c54ac46f6
parent5d639bd83a0f3642a1c2b605bcac23a25a071920 (diff)
downloadydb-80adf9ab238b01ed48926dca38bf7eefd39c823d.tar.gz
Intermediate changes
-rw-r--r--library/cpp/threading/future/README.md2
-rw-r--r--library/cpp/threading/future/benchmark/coroutine_traits.cpp82
-rw-r--r--library/cpp/threading/future/benchmark/ya.make11
-rw-r--r--library/cpp/threading/future/core/coroutine_traits.h99
-rw-r--r--library/cpp/threading/future/ya.make1
5 files changed, 195 insertions, 0 deletions
diff --git a/library/cpp/threading/future/README.md b/library/cpp/threading/future/README.md
new file mode 100644
index 0000000000..9b9929c822
--- /dev/null
+++ b/library/cpp/threading/future/README.md
@@ -0,0 +1,2 @@
+See:
+https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
diff --git a/library/cpp/threading/future/benchmark/coroutine_traits.cpp b/library/cpp/threading/future/benchmark/coroutine_traits.cpp
new file mode 100644
index 0000000000..93528bfac0
--- /dev/null
+++ b/library/cpp/threading/future/benchmark/coroutine_traits.cpp
@@ -0,0 +1,82 @@
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/threading/future/core/coroutine_traits.h>
+
+#include <benchmark/benchmark.h>
+
+class TContext {
+public:
+ TContext()
+ : NextInputPromise_(NThreading::NewPromise<bool>())
+ {}
+ ~TContext() {
+ UpdateNextInput(false);
+ }
+
+ NThreading::TFuture<bool> NextInput() {
+ return NextInputPromise_.GetFuture();
+ }
+
+ void UpdateNextInput(bool hasInput = true) {
+ auto prevNextInputPromise = NextInputPromise_;
+ NextInputPromise_ = NThreading::NewPromise<bool>();
+ prevNextInputPromise.SetValue(hasInput);
+ }
+
+private:
+ NThreading::TPromise<bool> NextInputPromise_;
+};
+
+static void TestPureFutureChainSubscribe(benchmark::State& state) {
+ TContext context;
+ size_t cnt = 0;
+ std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
+ if (hasInput.GetValue()) {
+ benchmark::DoNotOptimize(++cnt);
+ context.NextInput().Subscribe(processInput);
+ }
+ };
+
+ processInput(NThreading::MakeFuture<bool>(true));
+ for (auto _ : state) {
+ context.UpdateNextInput();
+ }
+ context.UpdateNextInput(false);
+}
+
+static void TestPureFutureChainApply(benchmark::State& state) {
+ TContext context;
+ size_t cnt = 0;
+ std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
+ if (hasInput.GetValue()) {
+ benchmark::DoNotOptimize(++cnt);
+ context.NextInput().Apply(processInput);
+ }
+ };
+
+ processInput(NThreading::MakeFuture<bool>(true));
+ for (auto _ : state) {
+ context.UpdateNextInput();
+ }
+ context.UpdateNextInput(false);
+}
+
+static void TestCoroFutureChain(benchmark::State& state) {
+ TContext context;
+ size_t cnt = 0;
+ auto coroutine = [&context, &cnt]() -> NThreading::TFuture<void> {
+ while (co_await context.NextInput()) {
+ benchmark::DoNotOptimize(++cnt);
+ }
+ };
+
+ auto coroutineFuture = coroutine();
+ for (auto _ : state) {
+ context.UpdateNextInput();
+ }
+ context.UpdateNextInput(false);
+ coroutineFuture.GetValueSync();
+}
+
+BENCHMARK(TestPureFutureChainSubscribe);
+BENCHMARK(TestPureFutureChainApply);
+BENCHMARK(TestCoroFutureChain);
diff --git a/library/cpp/threading/future/benchmark/ya.make b/library/cpp/threading/future/benchmark/ya.make
new file mode 100644
index 0000000000..f71f52acbb
--- /dev/null
+++ b/library/cpp/threading/future/benchmark/ya.make
@@ -0,0 +1,11 @@
+G_BENCHMARK()
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+SRCS(
+ coroutine_traits.cpp
+)
+
+END()
diff --git a/library/cpp/threading/future/core/coroutine_traits.h b/library/cpp/threading/future/core/coroutine_traits.h
new file mode 100644
index 0000000000..5871e22a35
--- /dev/null
+++ b/library/cpp/threading/future/core/coroutine_traits.h
@@ -0,0 +1,99 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+#include <coroutine>
+
+template<typename... Args>
+struct std::coroutine_traits<NThreading::TFuture<void>, Args...> {
+ struct promise_type {
+
+ NThreading::TFuture<void> get_return_object() {
+ return Promise_.GetFuture();
+ }
+
+ std::suspend_never initial_suspend() { return {}; }
+ std::suspend_never final_suspend() noexcept { return {}; }
+
+ void unhandled_exception() {
+ Promise_.SetException(std::current_exception());
+ }
+
+ void return_void() {
+ Promise_.SetValue();
+ }
+
+ private:
+ NThreading::TPromise<void> Promise_ = NThreading::NewPromise();
+ };
+};
+
+template<typename T, typename... Args>
+struct std::coroutine_traits<NThreading::TFuture<T>, Args...> {
+ struct promise_type {
+ NThreading::TFuture<T> get_return_object() {
+ return Promise_.GetFuture();
+ }
+
+ std::suspend_never initial_suspend() { return {}; }
+ std::suspend_never final_suspend() noexcept { return {}; }
+
+ void unhandled_exception() {
+ Promise_.SetException(std::current_exception());
+ }
+
+ void return_value(auto&& val) {
+ Promise_.SetValue(std::forward<decltype(val)>(val));
+ }
+
+ private:
+ NThreading::TPromise<T> Promise_ = NThreading::NewPromise<T>();
+ };
+};
+
+template<typename T>
+struct TFutureAwaitable {
+ NThreading::TFuture<T> Future;
+
+ TFutureAwaitable(NThreading::TFuture<T> future)
+ : Future{future}
+ {
+ }
+
+ bool await_ready() const noexcept {
+ return Future.HasValue() || Future.HasException();
+ }
+
+ void await_suspend(auto h) noexcept {
+ /*
+ * This library assumes that resume never throws an exception.
+ * This assumption is made due to the fact that the users of these library in most cases do not need to write their own coroutine handlers,
+ * and all coroutine handlers provided by the library do not throw exception from resume.
+ *
+ * WARNING: do not change subscribe to apply or something other here, creating an extra future state degrades performance.
+ */
+ Future.NoexceptSubscribe(
+ [h](auto) mutable noexcept {
+ h();
+ }
+ );
+ }
+
+ decltype(auto) await_resume() {
+ return Future.GetValue();
+ }
+};
+
+template<typename T>
+auto operator co_await(const NThreading::TFuture<T>& future) {
+ return TFutureAwaitable{future};
+}
+
+namespace NThreading {
+
+ template<typename T>
+ auto AsAwaitable(const NThreading::TFuture<T>& fut) {
+ return operator co_await(fut);
+ }
+
+} // namespace NThreading
diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make
index bc52ca5c18..1d98569906 100644
--- a/library/cpp/threading/future/ya.make
+++ b/library/cpp/threading/future/ya.make
@@ -15,6 +15,7 @@ SRCS(
END()
RECURSE(
+ benchmark
mt_ut
perf
ut