aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorjolex007 <jolex007@yandex-team.com>2025-02-27 12:33:07 +0300
committerjolex007 <jolex007@yandex-team.com>2025-02-27 12:48:28 +0300
commit08e7cde56ecd933346be66e2d41418a6ea0a0ab3 (patch)
treeada0e97f83aca15c0a94ff2314e7f389f91f3993 /library/cpp
parentd8421ced20cf7734d41d29a3209c87f93485faad (diff)
downloadydb-08e7cde56ecd933346be66e2d41418a6ea0a0ab3.tar.gz
Fix cancellation in unifetcher
commit_hash:909fa7aadbf673448dbc709b19d2088963b40404
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/http/simple/http_client.cpp34
-rw-r--r--library/cpp/http/simple/http_client.h37
-rw-r--r--library/cpp/http/simple/ya.make1
-rw-r--r--library/cpp/threading/cancellation/README.md112
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.cpp1
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.h133
-rw-r--r--library/cpp/threading/cancellation/ya.make11
7 files changed, 306 insertions, 23 deletions
diff --git a/library/cpp/http/simple/http_client.cpp b/library/cpp/http/simple/http_client.cpp
index bac6bdc39e..2be5a14582 100644
--- a/library/cpp/http/simple/http_client.cpp
+++ b/library/cpp/http/simple/http_client.cpp
@@ -25,26 +25,30 @@ TKeepAliveHttpClient::TKeepAliveHttpClient(const TString& host,
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoGet(const TStringBuf relativeUrl,
IOutputStream* output,
const THeaders& headers,
- THttpHeaders* outHeaders) {
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation) {
return DoRequest(TStringBuf("GET"),
relativeUrl,
{},
output,
headers,
- outHeaders);
+ outHeaders,
+ std::move(cancellation));
}
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoPost(const TStringBuf relativeUrl,
const TStringBuf body,
IOutputStream* output,
const THeaders& headers,
- THttpHeaders* outHeaders) {
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation) {
return DoRequest(TStringBuf("POST"),
relativeUrl,
body,
output,
headers,
- outHeaders);
+ outHeaders,
+ std::move(cancellation));
}
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequest(const TStringBuf method,
@@ -52,15 +56,17 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequest(const TStringBuf
const TStringBuf body,
IOutputStream* output,
const THeaders& inHeaders,
- THttpHeaders* outHeaders) {
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation) {
const TString contentLength = IntToString<10, size_t>(body.size());
- return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders);
+ return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders, std::move(cancellation));
}
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestRaw(const TStringBuf raw,
IOutputStream* output,
- THttpHeaders* outHeaders) {
- return DoRequestReliable(raw, output, outHeaders);
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation) {
+ return DoRequestReliable(raw, output, outHeaders, std::move(cancellation));
}
void TKeepAliveHttpClient::DisableVerificationForHttps() {
@@ -189,28 +195,28 @@ void TSimpleHttpClient::EnableVerificationForHttps() {
HttpsVerification = true;
}
-void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers) const {
+void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers, NThreading::TCancellationToken cancellation) const {
TKeepAliveHttpClient cl = CreateClient();
- TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers);
+ TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers, nullptr, std::move(cancellation));
Y_ENSURE(cl.GetHttpInput());
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);
}
-void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers) const {
+void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers, NThreading::TCancellationToken cancellation) const {
TKeepAliveHttpClient cl = CreateClient();
- TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers);
+ TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers, nullptr, std::move(cancellation));
Y_ENSURE(cl.GetHttpInput());
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);
}
-void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output) const {
+void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation) const {
TKeepAliveHttpClient cl = CreateClient();
- TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output);
+ TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output, nullptr, std::move(cancellation));
Y_ENSURE(cl.GetHttpInput());
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);
diff --git a/library/cpp/http/simple/http_client.h b/library/cpp/http/simple/http_client.h
index eab92d42da..224be58a24 100644
--- a/library/cpp/http/simple/http_client.h
+++ b/library/cpp/http/simple/http_client.h
@@ -12,6 +12,7 @@
#include <library/cpp/http/io/stream.h>
#include <library/cpp/http/misc/httpcodes.h>
#include <library/cpp/openssl/io/stream.h>
+#include <library/cpp/threading/cancellation/cancellation_token.h>
class TNetworkAddress;
class IOutputStream;
@@ -54,14 +55,16 @@ public:
THttpCode DoGet(const TStringBuf relativeUrl,
IOutputStream* output = nullptr,
const THeaders& headers = THeaders(),
- THttpHeaders* outHeaders = nullptr);
+ THttpHeaders* outHeaders = nullptr,
+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
// builds post request from headers and body
THttpCode DoPost(const TStringBuf relativeUrl,
const TStringBuf body,
IOutputStream* output = nullptr,
const THeaders& headers = THeaders(),
- THttpHeaders* outHeaders = nullptr);
+ THttpHeaders* outHeaders = nullptr,
+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
// builds request with any HTTP method from headers and body
THttpCode DoRequest(const TStringBuf method,
@@ -69,12 +72,14 @@ public:
const TStringBuf body,
IOutputStream* output = nullptr,
const THeaders& inHeaders = THeaders(),
- THttpHeaders* outHeaders = nullptr);
+ THttpHeaders* outHeaders = nullptr,
+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
// requires already well-formed request
THttpCode DoRequestRaw(const TStringBuf raw,
IOutputStream* output = nullptr,
- THttpHeaders* outHeaders = nullptr);
+ THttpHeaders* outHeaders = nullptr,
+ NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
void DisableVerificationForHttps();
void SetClientCertificate(const TOpenSslClientIO::TOptions::TClientCert& options);
@@ -93,7 +98,8 @@ private:
template <class T>
THttpCode DoRequestReliable(const T& raw,
IOutputStream* output,
- THttpHeaders* outHeaders);
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation);
TVector<IOutputStream::TPart> FormRequest(TStringBuf method, const TStringBuf relativeUrl,
TStringBuf body,
@@ -166,13 +172,13 @@ public:
void EnableVerificationForHttps();
- void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders()) const;
+ void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
// builds post request from headers and body
- void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders()) const;
+ void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
// requires already well-formed post request
- void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output) const;
+ void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
virtual ~TSimpleHttpClient();
@@ -227,6 +233,10 @@ namespace NPrivate {
return HttpIn.Get();
}
+ void Shutdown() {
+ Socket.ShutDown(SHUT_RDWR);
+ }
+
private:
static TNetworkAddress Resolve(const TString& host, ui32 port);
@@ -250,12 +260,18 @@ namespace NPrivate {
template <class T>
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T& raw,
IOutputStream* output,
- THttpHeaders* outHeaders) {
+ THttpHeaders* outHeaders,
+ NThreading::TCancellationToken cancellation) {
+
for (int i = 0; i < 2; ++i) {
const bool haveNewConnection = CreateNewConnectionIfNeeded();
const bool couldRetry = !haveNewConnection && i == 0; // Actually old connection could be already closed by server,
// so we should try one more time in this case.
try {
+ cancellation.Future().Subscribe([&](auto&) {
+ Connection->Shutdown();
+ });
+
Connection->Write(raw);
THttpCode code = ReadAndTransferHttp(*Connection->GetHttpInput(), output, outHeaders);
@@ -265,16 +281,19 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T&
return code;
} catch (const TSystemError& e) {
Connection.Reset();
+ cancellation.ThrowIfCancellationRequested();
if (!couldRetry || e.Status() != EPIPE) {
throw;
}
} catch (const THttpReadException&) { // Actually old connection is already closed by server
Connection.Reset();
+ cancellation.ThrowIfCancellationRequested();
if (!couldRetry) {
throw;
}
} catch (const std::exception&) {
Connection.Reset();
+ cancellation.ThrowIfCancellationRequested();
throw;
}
}
diff --git a/library/cpp/http/simple/ya.make b/library/cpp/http/simple/ya.make
index 40744675e8..6a4e5775a4 100644
--- a/library/cpp/http/simple/ya.make
+++ b/library/cpp/http/simple/ya.make
@@ -4,6 +4,7 @@ PEERDIR(
library/cpp/http/io
library/cpp/openssl/io
library/cpp/string_utils/url
+ library/cpp/threading/cancellation
)
SRCS(
diff --git a/library/cpp/threading/cancellation/README.md b/library/cpp/threading/cancellation/README.md
new file mode 100644
index 0000000000..8fb8ea616c
--- /dev/null
+++ b/library/cpp/threading/cancellation/README.md
@@ -0,0 +1,112 @@
+The Cancellation library
+========================
+
+Intro
+-----
+
+This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations.
+The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework
+
+To use the library include `cancellation_token.h`.
+
+Examples
+--------
+
+1. Simple check for cancellation
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ ...
+ if (token.IsCancellationRequested()) {
+ return;
+ }
+ ...
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+2. Exit via an exception
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ try {
+ for (;;) {
+ ...
+ token.ThrowIfCancellationRequested();
+ ...
+ }
+ } catch (TOperationCancelledException const&) {
+ return;
+ } catch (...) {
+ Y_ABORT("Never should be there")
+ }
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+3. Periodic poll with cancellation
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ while (!token.Wait(PollInterval)) {
+ ...
+ }
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+4. Waiting on the future
+
+ ```c++
+ TFuture<void> InnerOperation();
+ TFuture<void> OuterOperation(TCancellationToken token) {
+ return WaitAny(FirstOperation(), token.Future())
+ .Apply([token = std::move(token)](auto&&) {
+ token.ThrowIfCancellationRequested();
+ });
+ }
+
+ TCancellationTokenSource source;
+ auto future = OuterOperation();
+ ...
+ source.Cancel()
+ ...
+ try {
+ auto value = future.ExtractValueSync();
+ } catch (TOperationCancelledException const&) {
+ // cancelled
+ }
+ ```
+
+5. Using default token when no cancellation needed
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ ...
+ if (token.IsCancellationRequested()) {
+ return;
+ }
+ ...
+ }
+
+ // We do not want to cancel the operation. So, there is no need to create a cancellation token source
+ LongRunningOperation(TCancellationToken::Default);
+ ```
diff --git a/library/cpp/threading/cancellation/cancellation_token.cpp b/library/cpp/threading/cancellation/cancellation_token.cpp
new file mode 100644
index 0000000000..1a0a19f690
--- /dev/null
+++ b/library/cpp/threading/cancellation/cancellation_token.cpp
@@ -0,0 +1 @@
+#include "cancellation_token.h"
diff --git a/library/cpp/threading/cancellation/cancellation_token.h b/library/cpp/threading/cancellation/cancellation_token.h
new file mode 100644
index 0000000000..7615965b14
--- /dev/null
+++ b/library/cpp/threading/cancellation/cancellation_token.h
@@ -0,0 +1,133 @@
+#pragma once
+
+#include "operation_cancelled_exception.h"
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/singleton.h>
+
+namespace NThreading {
+
+class TCancellationTokenSource;
+
+//! A cancellation token could be passed to an async or long running operation to perform a cooperative operation cancel
+class TCancellationToken {
+private:
+ TFuture<void> Future_;
+ TInstant Deadline_ = TInstant::Max();
+
+public:
+ TCancellationToken() = delete;
+ TCancellationToken(const TCancellationToken&) noexcept = default;
+ TCancellationToken(TCancellationToken&&) noexcept = default;
+ TCancellationToken& operator = (const TCancellationToken&) noexcept = default;
+ TCancellationToken& operator = (TCancellationToken&&) noexcept = default;
+
+ //! Shows whether a cancellation has been requested
+ bool IsCancellationRequested() const {
+ return Future_.HasValue();
+ }
+
+ //! Shows whether a cancellation has been requested
+ bool IsDeadlineReached() const {
+ return TInstant::Now() > Deadline_;
+ }
+
+ //! Throws the TOperationCancelledException if a cancellation has been requested
+ void ThrowIfCancellationRequested() const {
+ if (IsCancellationRequested()) {
+ ythrow TOperationCancelledException();
+ }
+ }
+
+ //! Throws the TOperationCancelledException if the Deadline_ has been reached
+ void ThrowIfDeadlineReached() const {
+ if (IsDeadlineReached()) {
+ ythrow TOperationCancelledException();
+ }
+ }
+
+ //! Throws the TOperationCancelledException if a cancellation has been requested
+ // or reached the Deadline_
+ void ThrowIfTokenCancelled() const {
+ ThrowIfCancellationRequested();
+ ThrowIfDeadlineReached();
+ }
+
+ //! Waits for a cancellation
+ bool Wait(TDuration duration) const {
+ return Future_.Wait(duration);
+ }
+
+ bool Wait(TInstant deadline) const {
+ return Future_.Wait(deadline);
+ }
+
+ void Wait() const {
+ return Future_.Wait();
+ }
+
+ //! Returns a future that could be used for waiting for a cancellation
+ TFuture<void> const& Future() const noexcept {
+ return Future_;
+ }
+
+ //! The default cancellation token that cannot be cancelled
+ static TCancellationToken const& Default() {
+ return *SingletonWithPriority<TCancellationToken, 0>(NewPromise());
+ }
+
+ void SetDeadline(TInstant deadline) {
+ Deadline_ = deadline;
+ }
+
+ TInstant GetDeadline() const {
+ return Deadline_;
+ }
+
+private:
+ TCancellationToken(TFuture<void> future)
+ : Future_(std::move(future))
+ {
+ }
+
+private:
+ friend class TCancellationTokenSource;
+
+ Y_DECLARE_SINGLETON_FRIEND();
+};
+
+//! A cancellation token source produces cancellation tokens to be passed to cancellable operations
+class TCancellationTokenSource {
+private:
+ TPromise<void> Promise;
+
+public:
+ TCancellationTokenSource()
+ : Promise(NewPromise())
+ {
+ }
+
+ TCancellationTokenSource(TCancellationTokenSource const&) = delete;
+ TCancellationTokenSource(TCancellationTokenSource&&) = delete;
+ TCancellationTokenSource& operator=(TCancellationTokenSource const&) = delete;
+ TCancellationTokenSource& operator=(TCancellationTokenSource&&) = delete;
+
+ //! Shows whether a cancellation has been requested
+ bool IsCancellationRequested() const noexcept {
+ return Promise.HasValue();
+ }
+
+ //! Produces a cancellation token
+ TCancellationToken Token() const {
+ return TCancellationToken(Promise.GetFuture());
+ }
+
+ //! Propagates a cancel request to all produced tokens
+ void Cancel() noexcept {
+ Promise.TrySetValue();
+ }
+};
+
+}
diff --git a/library/cpp/threading/cancellation/ya.make b/library/cpp/threading/cancellation/ya.make
new file mode 100644
index 0000000000..e9988f0f08
--- /dev/null
+++ b/library/cpp/threading/cancellation/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+SRCS(
+ cancellation_token.cpp
+)
+
+END()