diff options
author | jolex007 <jolex007@yandex-team.com> | 2025-02-27 12:33:07 +0300 |
---|---|---|
committer | jolex007 <jolex007@yandex-team.com> | 2025-02-27 12:48:28 +0300 |
commit | 08e7cde56ecd933346be66e2d41418a6ea0a0ab3 (patch) | |
tree | ada0e97f83aca15c0a94ff2314e7f389f91f3993 /library/cpp | |
parent | d8421ced20cf7734d41d29a3209c87f93485faad (diff) | |
download | ydb-08e7cde56ecd933346be66e2d41418a6ea0a0ab3.tar.gz |
Fix cancellation in unifetcher
commit_hash:909fa7aadbf673448dbc709b19d2088963b40404
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/http/simple/http_client.cpp | 34 | ||||
-rw-r--r-- | library/cpp/http/simple/http_client.h | 37 | ||||
-rw-r--r-- | library/cpp/http/simple/ya.make | 1 | ||||
-rw-r--r-- | library/cpp/threading/cancellation/README.md | 112 | ||||
-rw-r--r-- | library/cpp/threading/cancellation/cancellation_token.cpp | 1 | ||||
-rw-r--r-- | library/cpp/threading/cancellation/cancellation_token.h | 133 | ||||
-rw-r--r-- | library/cpp/threading/cancellation/ya.make | 11 |
7 files changed, 306 insertions, 23 deletions
diff --git a/library/cpp/http/simple/http_client.cpp b/library/cpp/http/simple/http_client.cpp index bac6bdc39e..2be5a14582 100644 --- a/library/cpp/http/simple/http_client.cpp +++ b/library/cpp/http/simple/http_client.cpp @@ -25,26 +25,30 @@ TKeepAliveHttpClient::TKeepAliveHttpClient(const TString& host, TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers, - THttpHeaders* outHeaders) { + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation) { return DoRequest(TStringBuf("GET"), relativeUrl, {}, output, headers, - outHeaders); + outHeaders, + std::move(cancellation)); } TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoPost(const TStringBuf relativeUrl, const TStringBuf body, IOutputStream* output, const THeaders& headers, - THttpHeaders* outHeaders) { + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation) { return DoRequest(TStringBuf("POST"), relativeUrl, body, output, headers, - outHeaders); + outHeaders, + std::move(cancellation)); } TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequest(const TStringBuf method, @@ -52,15 +56,17 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequest(const TStringBuf const TStringBuf body, IOutputStream* output, const THeaders& inHeaders, - THttpHeaders* outHeaders) { + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation) { const TString contentLength = IntToString<10, size_t>(body.size()); - return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders); + return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders, std::move(cancellation)); } TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestRaw(const TStringBuf raw, IOutputStream* output, - THttpHeaders* outHeaders) { - return DoRequestReliable(raw, output, outHeaders); + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation) { + return DoRequestReliable(raw, output, outHeaders, std::move(cancellation)); } void TKeepAliveHttpClient::DisableVerificationForHttps() { @@ -189,28 +195,28 @@ void TSimpleHttpClient::EnableVerificationForHttps() { HttpsVerification = true; } -void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers) const { +void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers, NThreading::TCancellationToken cancellation) const { TKeepAliveHttpClient cl = CreateClient(); - TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers); + TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers, nullptr, std::move(cancellation)); Y_ENSURE(cl.GetHttpInput()); ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code); } -void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers) const { +void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers, NThreading::TCancellationToken cancellation) const { TKeepAliveHttpClient cl = CreateClient(); - TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers); + TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers, nullptr, std::move(cancellation)); Y_ENSURE(cl.GetHttpInput()); ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code); } -void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output) const { +void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation) const { TKeepAliveHttpClient cl = CreateClient(); - TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output); + TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output, nullptr, std::move(cancellation)); Y_ENSURE(cl.GetHttpInput()); ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code); diff --git a/library/cpp/http/simple/http_client.h b/library/cpp/http/simple/http_client.h index eab92d42da..224be58a24 100644 --- a/library/cpp/http/simple/http_client.h +++ b/library/cpp/http/simple/http_client.h @@ -12,6 +12,7 @@ #include <library/cpp/http/io/stream.h> #include <library/cpp/http/misc/httpcodes.h> #include <library/cpp/openssl/io/stream.h> +#include <library/cpp/threading/cancellation/cancellation_token.h> class TNetworkAddress; class IOutputStream; @@ -54,14 +55,16 @@ public: THttpCode DoGet(const TStringBuf relativeUrl, IOutputStream* output = nullptr, const THeaders& headers = THeaders(), - THttpHeaders* outHeaders = nullptr); + THttpHeaders* outHeaders = nullptr, + NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()); // builds post request from headers and body THttpCode DoPost(const TStringBuf relativeUrl, const TStringBuf body, IOutputStream* output = nullptr, const THeaders& headers = THeaders(), - THttpHeaders* outHeaders = nullptr); + THttpHeaders* outHeaders = nullptr, + NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()); // builds request with any HTTP method from headers and body THttpCode DoRequest(const TStringBuf method, @@ -69,12 +72,14 @@ public: const TStringBuf body, IOutputStream* output = nullptr, const THeaders& inHeaders = THeaders(), - THttpHeaders* outHeaders = nullptr); + THttpHeaders* outHeaders = nullptr, + NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()); // requires already well-formed request THttpCode DoRequestRaw(const TStringBuf raw, IOutputStream* output = nullptr, - THttpHeaders* outHeaders = nullptr); + THttpHeaders* outHeaders = nullptr, + NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()); void DisableVerificationForHttps(); void SetClientCertificate(const TOpenSslClientIO::TOptions::TClientCert& options); @@ -93,7 +98,8 @@ private: template <class T> THttpCode DoRequestReliable(const T& raw, IOutputStream* output, - THttpHeaders* outHeaders); + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation); TVector<IOutputStream::TPart> FormRequest(TStringBuf method, const TStringBuf relativeUrl, TStringBuf body, @@ -166,13 +172,13 @@ public: void EnableVerificationForHttps(); - void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders()) const; + void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const; // builds post request from headers and body - void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders()) const; + void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const; // requires already well-formed post request - void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output) const; + void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const; virtual ~TSimpleHttpClient(); @@ -227,6 +233,10 @@ namespace NPrivate { return HttpIn.Get(); } + void Shutdown() { + Socket.ShutDown(SHUT_RDWR); + } + private: static TNetworkAddress Resolve(const TString& host, ui32 port); @@ -250,12 +260,18 @@ namespace NPrivate { template <class T> TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T& raw, IOutputStream* output, - THttpHeaders* outHeaders) { + THttpHeaders* outHeaders, + NThreading::TCancellationToken cancellation) { + for (int i = 0; i < 2; ++i) { const bool haveNewConnection = CreateNewConnectionIfNeeded(); const bool couldRetry = !haveNewConnection && i == 0; // Actually old connection could be already closed by server, // so we should try one more time in this case. try { + cancellation.Future().Subscribe([&](auto&) { + Connection->Shutdown(); + }); + Connection->Write(raw); THttpCode code = ReadAndTransferHttp(*Connection->GetHttpInput(), output, outHeaders); @@ -265,16 +281,19 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T& return code; } catch (const TSystemError& e) { Connection.Reset(); + cancellation.ThrowIfCancellationRequested(); if (!couldRetry || e.Status() != EPIPE) { throw; } } catch (const THttpReadException&) { // Actually old connection is already closed by server Connection.Reset(); + cancellation.ThrowIfCancellationRequested(); if (!couldRetry) { throw; } } catch (const std::exception&) { Connection.Reset(); + cancellation.ThrowIfCancellationRequested(); throw; } } diff --git a/library/cpp/http/simple/ya.make b/library/cpp/http/simple/ya.make index 40744675e8..6a4e5775a4 100644 --- a/library/cpp/http/simple/ya.make +++ b/library/cpp/http/simple/ya.make @@ -4,6 +4,7 @@ PEERDIR( library/cpp/http/io library/cpp/openssl/io library/cpp/string_utils/url + library/cpp/threading/cancellation ) SRCS( diff --git a/library/cpp/threading/cancellation/README.md b/library/cpp/threading/cancellation/README.md new file mode 100644 index 0000000000..8fb8ea616c --- /dev/null +++ b/library/cpp/threading/cancellation/README.md @@ -0,0 +1,112 @@ +The Cancellation library +======================== + +Intro +----- + +This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations. +The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework + +To use the library include `cancellation_token.h`. + +Examples +-------- + +1. Simple check for cancellation + + ```c++ + void LongRunningOperation(TCancellationToken token) { + ... + if (token.IsCancellationRequested()) { + return; + } + ... + } + + TCancellationTokenSource source; + TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); + thread.Start(); + ... + source.Cancel(); + thread.Join(); + ``` + +2. Exit via an exception + + ```c++ + void LongRunningOperation(TCancellationToken token) { + try { + for (;;) { + ... + token.ThrowIfCancellationRequested(); + ... + } + } catch (TOperationCancelledException const&) { + return; + } catch (...) { + Y_ABORT("Never should be there") + } + } + + TCancellationTokenSource source; + TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); + thread.Start(); + ... + source.Cancel(); + thread.Join(); + ``` + +3. Periodic poll with cancellation + + ```c++ + void LongRunningOperation(TCancellationToken token) { + while (!token.Wait(PollInterval)) { + ... + } + } + + TCancellationTokenSource source; + TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); + thread.Start(); + ... + source.Cancel(); + thread.Join(); + ``` + +4. Waiting on the future + + ```c++ + TFuture<void> InnerOperation(); + TFuture<void> OuterOperation(TCancellationToken token) { + return WaitAny(FirstOperation(), token.Future()) + .Apply([token = std::move(token)](auto&&) { + token.ThrowIfCancellationRequested(); + }); + } + + TCancellationTokenSource source; + auto future = OuterOperation(); + ... + source.Cancel() + ... + try { + auto value = future.ExtractValueSync(); + } catch (TOperationCancelledException const&) { + // cancelled + } + ``` + +5. Using default token when no cancellation needed + + ```c++ + void LongRunningOperation(TCancellationToken token) { + ... + if (token.IsCancellationRequested()) { + return; + } + ... + } + + // We do not want to cancel the operation. So, there is no need to create a cancellation token source + LongRunningOperation(TCancellationToken::Default); + ``` diff --git a/library/cpp/threading/cancellation/cancellation_token.cpp b/library/cpp/threading/cancellation/cancellation_token.cpp new file mode 100644 index 0000000000..1a0a19f690 --- /dev/null +++ b/library/cpp/threading/cancellation/cancellation_token.cpp @@ -0,0 +1 @@ +#include "cancellation_token.h" diff --git a/library/cpp/threading/cancellation/cancellation_token.h b/library/cpp/threading/cancellation/cancellation_token.h new file mode 100644 index 0000000000..7615965b14 --- /dev/null +++ b/library/cpp/threading/cancellation/cancellation_token.h @@ -0,0 +1,133 @@ +#pragma once + +#include "operation_cancelled_exception.h" + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/ptr.h> +#include <util/generic/singleton.h> + +namespace NThreading { + +class TCancellationTokenSource; + +//! A cancellation token could be passed to an async or long running operation to perform a cooperative operation cancel +class TCancellationToken { +private: + TFuture<void> Future_; + TInstant Deadline_ = TInstant::Max(); + +public: + TCancellationToken() = delete; + TCancellationToken(const TCancellationToken&) noexcept = default; + TCancellationToken(TCancellationToken&&) noexcept = default; + TCancellationToken& operator = (const TCancellationToken&) noexcept = default; + TCancellationToken& operator = (TCancellationToken&&) noexcept = default; + + //! Shows whether a cancellation has been requested + bool IsCancellationRequested() const { + return Future_.HasValue(); + } + + //! Shows whether a cancellation has been requested + bool IsDeadlineReached() const { + return TInstant::Now() > Deadline_; + } + + //! Throws the TOperationCancelledException if a cancellation has been requested + void ThrowIfCancellationRequested() const { + if (IsCancellationRequested()) { + ythrow TOperationCancelledException(); + } + } + + //! Throws the TOperationCancelledException if the Deadline_ has been reached + void ThrowIfDeadlineReached() const { + if (IsDeadlineReached()) { + ythrow TOperationCancelledException(); + } + } + + //! Throws the TOperationCancelledException if a cancellation has been requested + // or reached the Deadline_ + void ThrowIfTokenCancelled() const { + ThrowIfCancellationRequested(); + ThrowIfDeadlineReached(); + } + + //! Waits for a cancellation + bool Wait(TDuration duration) const { + return Future_.Wait(duration); + } + + bool Wait(TInstant deadline) const { + return Future_.Wait(deadline); + } + + void Wait() const { + return Future_.Wait(); + } + + //! Returns a future that could be used for waiting for a cancellation + TFuture<void> const& Future() const noexcept { + return Future_; + } + + //! The default cancellation token that cannot be cancelled + static TCancellationToken const& Default() { + return *SingletonWithPriority<TCancellationToken, 0>(NewPromise()); + } + + void SetDeadline(TInstant deadline) { + Deadline_ = deadline; + } + + TInstant GetDeadline() const { + return Deadline_; + } + +private: + TCancellationToken(TFuture<void> future) + : Future_(std::move(future)) + { + } + +private: + friend class TCancellationTokenSource; + + Y_DECLARE_SINGLETON_FRIEND(); +}; + +//! A cancellation token source produces cancellation tokens to be passed to cancellable operations +class TCancellationTokenSource { +private: + TPromise<void> Promise; + +public: + TCancellationTokenSource() + : Promise(NewPromise()) + { + } + + TCancellationTokenSource(TCancellationTokenSource const&) = delete; + TCancellationTokenSource(TCancellationTokenSource&&) = delete; + TCancellationTokenSource& operator=(TCancellationTokenSource const&) = delete; + TCancellationTokenSource& operator=(TCancellationTokenSource&&) = delete; + + //! Shows whether a cancellation has been requested + bool IsCancellationRequested() const noexcept { + return Promise.HasValue(); + } + + //! Produces a cancellation token + TCancellationToken Token() const { + return TCancellationToken(Promise.GetFuture()); + } + + //! Propagates a cancel request to all produced tokens + void Cancel() noexcept { + Promise.TrySetValue(); + } +}; + +} diff --git a/library/cpp/threading/cancellation/ya.make b/library/cpp/threading/cancellation/ya.make new file mode 100644 index 0000000000..e9988f0f08 --- /dev/null +++ b/library/cpp/threading/cancellation/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +PEERDIR( + library/cpp/threading/future +) + +SRCS( + cancellation_token.cpp +) + +END() |