diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/client | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/client')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 17 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.h | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/file_reader.cpp | 14 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/file_reader.h | 14 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/partition_reader.cpp | 16 |
5 files changed, 55 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 1eade18397a..0bbc6fa1322 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -84,6 +84,11 @@ bool TClientReader::Retry( const TMaybe<ui64>& rowIndex, const std::exception_ptr& error) { + // We always stop retries if reader is aborted + if (IAbortableInputStream::IsAbortedError(error)) { + std::rethrow_exception(error); + } + if (CurrentRequestRetryPolicy_) { TMaybe<TDuration> backoffDuration; try { @@ -124,6 +129,16 @@ void TClientReader::ResetRetries() CurrentRequestRetryPolicy_ = nullptr; } +void TClientReader::Abort() +{ + Input_->Abort(); +} + +bool TClientReader::IsAborted() const +{ + return Input_->IsAborted(); +} + size_t TClientReader::DoRead(void* buf, size_t len) { return Input_->Read(buf, len); @@ -178,7 +193,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } - Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>( CurrentRequestRetryPolicy_, [this, &transactionId] (TMutationId /*mutationId*/) { return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 9f3160c9c7d..19a9f1ad173 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -2,6 +2,8 @@ #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/interface/abortable_stream.h> + #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/http.h> @@ -37,6 +39,9 @@ public: bool HasRangeIndices() const override { return true; } + void Abort() override; + bool IsAborted() const override; + protected: size_t DoRead(void* buf, size_t len) override; @@ -53,7 +58,7 @@ private: std::unique_ptr<TPingableTransaction> ReadTransaction_; - std::unique_ptr<IInputStream> Input_; + std::unique_ptr<IAbortableInputStream> Input_; IRequestRetryPolicyPtr CurrentRequestRetryPolicy_; diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 6163dd55331..ffdd2c9ac30 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -50,6 +50,16 @@ TStreamReaderBase::TStreamReaderBase( TStartTransactionOptions())) { } +void TStreamReaderBase::Abort() +{ + Input_->Abort(); +} + +bool TStreamReaderBase::IsAborted() const +{ + return Input_->IsAborted(); +} + TStreamReaderBase::~TStreamReaderBase() = default; TYPath TStreamReaderBase::Snapshot(const TYPath& path) @@ -98,7 +108,7 @@ TFileReader::TFileReader( Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); } -std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IAbortableInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const ui64 currentOffset = StartOffset_ + readBytes; @@ -130,7 +140,7 @@ TBlobTableReader::TBlobTableReader( Path_ = TStreamReaderBase::Snapshot(path); } -std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr<IAbortableInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const i64 currentOffset = StartOffset_ + readBytes; const i64 startPartIndex = currentOffset / Options_.PartSize_; diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index d3efe90f26a..6c7bcc8935e 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -2,13 +2,12 @@ #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/interface/abortable_stream.h> #include <yt/cpp/mapreduce/interface/io.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> -class IInputStream; - namespace NYT { class TPingableTransaction; @@ -27,6 +26,9 @@ public: const TClientContext& context, const TTransactionId& transactionId); + void Abort() override; + bool IsAborted() const override; + ~TStreamReaderBase(); protected: @@ -37,13 +39,13 @@ protected: private: size_t DoRead(void* buf, size_t len) override; - virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0; + virtual std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0; private: const IClientRetryPolicyPtr ClientRetryPolicy_; TFileReaderOptions FileReaderOptions_; - std::unique_ptr<IInputStream> Input_; + std::unique_ptr<IAbortableInputStream> Input_; std::unique_ptr<TPingableTransaction> ReadTransaction_; @@ -66,7 +68,7 @@ public: const TFileReaderOptions& options = {}); private: - std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: const ui64 StartOffset_; @@ -93,7 +95,7 @@ public: const TBlobTableReaderOptions& options = {}); private: - std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override; private: const ui64 StartOffset_; diff --git a/yt/cpp/mapreduce/client/partition_reader.cpp b/yt/cpp/mapreduce/client/partition_reader.cpp index a2ca7eb9758..d849040d854 100644 --- a/yt/cpp/mapreduce/client/partition_reader.cpp +++ b/yt/cpp/mapreduce/client/partition_reader.cpp @@ -12,7 +12,7 @@ class TPartitionTableReader : public TRawTableReader { public: - TPartitionTableReader(std::unique_ptr<IInputStream> input) + TPartitionTableReader(std::unique_ptr<IAbortableInputStream> input) : Input_(std::move(input)) { } @@ -32,6 +32,16 @@ public: return false; } + void Abort() override + { + Input_->Abort(); + } + + bool IsAborted() const override + { + return Input_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -39,7 +49,7 @@ protected: } private: - std::unique_ptr<IInputStream> Input_; + std::unique_ptr<IAbortableInputStream> Input_; }; //////////////////////////////////////////////////////////////////////////////// @@ -52,7 +62,7 @@ TRawTableReaderPtr CreateTablePartitionReader( const TTablePartitionReaderOptions& options) { - auto stream = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + auto stream = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>( retryPolicy, [&] (TMutationId /*mutationId*/) { return rawClient->ReadTablePartition(cookie, format, options); |
