diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/client/client_reader.cpp | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 1eade18397a..0bbc6fa1322 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -84,6 +84,11 @@ bool TClientReader::Retry( const TMaybe<ui64>& rowIndex, const std::exception_ptr& error) { + // We always stop retries if reader is aborted + if (IAbortableInputStream::IsAbortedError(error)) { + std::rethrow_exception(error); + } + if (CurrentRequestRetryPolicy_) { TMaybe<TDuration> backoffDuration; try { @@ -124,6 +129,16 @@ void TClientReader::ResetRetries() CurrentRequestRetryPolicy_ = nullptr; } +void TClientReader::Abort() +{ + Input_->Abort(); +} + +bool TClientReader::IsAborted() const +{ + return Input_->IsAborted(); +} + size_t TClientReader::DoRead(void* buf, size_t len) { return Input_->Read(buf, len); @@ -178,7 +193,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } - Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>( CurrentRequestRetryPolicy_, [this, &transactionId] (TMutationId /*mutationId*/) { return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); |
