summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/client
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/client')
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp17
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h7
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/client/file_reader.h14
-rw-r--r--yt/cpp/mapreduce/client/partition_reader.cpp16
5 files changed, 55 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index 1eade18397a..0bbc6fa1322 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -84,6 +84,11 @@ bool TClientReader::Retry(
const TMaybe<ui64>& rowIndex,
const std::exception_ptr& error)
{
+ // We always stop retries if reader is aborted
+ if (IAbortableInputStream::IsAbortedError(error)) {
+ std::rethrow_exception(error);
+ }
+
if (CurrentRequestRetryPolicy_) {
TMaybe<TDuration> backoffDuration;
try {
@@ -124,6 +129,16 @@ void TClientReader::ResetRetries()
CurrentRequestRetryPolicy_ = nullptr;
}
+void TClientReader::Abort()
+{
+ Input_->Abort();
+}
+
+bool TClientReader::IsAborted() const
+{
+ return Input_->IsAborted();
+}
+
size_t TClientReader::DoRead(void* buf, size_t len)
{
return Input_->Read(buf, len);
@@ -178,7 +193,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
- Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ Input_ = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
CurrentRequestRetryPolicy_,
[this, &transactionId] (TMutationId /*mutationId*/) {
return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 9f3160c9c7d..19a9f1ad173 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -2,6 +2,8 @@
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/http.h>
@@ -37,6 +39,9 @@ public:
bool HasRangeIndices() const override { return true; }
+ void Abort() override;
+ bool IsAborted() const override;
+
protected:
size_t DoRead(void* buf, size_t len) override;
@@ -53,7 +58,7 @@ private:
std::unique_ptr<TPingableTransaction> ReadTransaction_;
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
IRequestRetryPolicyPtr CurrentRequestRetryPolicy_;
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 6163dd55331..ffdd2c9ac30 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -50,6 +50,16 @@ TStreamReaderBase::TStreamReaderBase(
TStartTransactionOptions()))
{ }
+void TStreamReaderBase::Abort()
+{
+ Input_->Abort();
+}
+
+bool TStreamReaderBase::IsAborted() const
+{
+ return Input_->IsAborted();
+}
+
TStreamReaderBase::~TStreamReaderBase() = default;
TYPath TStreamReaderBase::Snapshot(const TYPath& path)
@@ -98,7 +108,7 @@ TFileReader::TFileReader(
Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
}
-std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IAbortableInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const ui64 currentOffset = StartOffset_ + readBytes;
@@ -130,7 +140,7 @@ TBlobTableReader::TBlobTableReader(
Path_ = TStreamReaderBase::Snapshot(path);
}
-std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
+std::unique_ptr<IAbortableInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
{
const i64 currentOffset = StartOffset_ + readBytes;
const i64 startPartIndex = currentOffset / Options_.PartSize_;
diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h
index d3efe90f26a..6c7bcc8935e 100644
--- a/yt/cpp/mapreduce/client/file_reader.h
+++ b/yt/cpp/mapreduce/client/file_reader.h
@@ -2,13 +2,12 @@
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
-class IInputStream;
-
namespace NYT {
class TPingableTransaction;
@@ -27,6 +26,9 @@ public:
const TClientContext& context,
const TTransactionId& transactionId);
+ void Abort() override;
+ bool IsAborted() const override;
+
~TStreamReaderBase();
protected:
@@ -37,13 +39,13 @@ protected:
private:
size_t DoRead(void* buf, size_t len) override;
- virtual std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0;
+ virtual std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) = 0;
private:
const IClientRetryPolicyPtr ClientRetryPolicy_;
TFileReaderOptions FileReaderOptions_;
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
std::unique_ptr<TPingableTransaction> ReadTransaction_;
@@ -66,7 +68,7 @@ public:
const TFileReaderOptions& options = {});
private:
- std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
const ui64 StartOffset_;
@@ -93,7 +95,7 @@ public:
const TBlobTableReaderOptions& options = {});
private:
- std::unique_ptr<IInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
+ std::unique_ptr<IAbortableInputStream> Request(const TTransactionId& transactionId, ui64 readBytes) override;
private:
const ui64 StartOffset_;
diff --git a/yt/cpp/mapreduce/client/partition_reader.cpp b/yt/cpp/mapreduce/client/partition_reader.cpp
index a2ca7eb9758..d849040d854 100644
--- a/yt/cpp/mapreduce/client/partition_reader.cpp
+++ b/yt/cpp/mapreduce/client/partition_reader.cpp
@@ -12,7 +12,7 @@ class TPartitionTableReader
: public TRawTableReader
{
public:
- TPartitionTableReader(std::unique_ptr<IInputStream> input)
+ TPartitionTableReader(std::unique_ptr<IAbortableInputStream> input)
: Input_(std::move(input))
{ }
@@ -32,6 +32,16 @@ public:
return false;
}
+ void Abort() override
+ {
+ Input_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Input_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -39,7 +49,7 @@ protected:
}
private:
- std::unique_ptr<IInputStream> Input_;
+ std::unique_ptr<IAbortableInputStream> Input_;
};
////////////////////////////////////////////////////////////////////////////////
@@ -52,7 +62,7 @@ TRawTableReaderPtr CreateTablePartitionReader(
const TTablePartitionReaderOptions& options)
{
- auto stream = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ auto stream = NDetail::RequestWithRetry<std::unique_ptr<IAbortableInputStream>>(
retryPolicy,
[&] (TMutationId /*mutationId*/) {
return rawClient->ReadTablePartition(cookie, format, options);