summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/interface
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/interface')
-rw-r--r--yt/cpp/mapreduce/interface/abortable_stream.cpp34
-rw-r--r--yt/cpp/mapreduce/interface/abortable_stream.h33
-rw-r--r--yt/cpp/mapreduce/interface/config.cpp4
-rw-r--r--yt/cpp/mapreduce/interface/config.h12
-rw-r--r--yt/cpp/mapreduce/interface/errors.h7
-rw-r--r--yt/cpp/mapreduce/interface/io-inl.h12
-rw-r--r--yt/cpp/mapreduce/interface/io.cpp10
-rw-r--r--yt/cpp/mapreduce/interface/io.h11
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h10
-rw-r--r--yt/cpp/mapreduce/interface/ya.make1
10 files changed, 128 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/interface/abortable_stream.cpp b/yt/cpp/mapreduce/interface/abortable_stream.cpp
new file mode 100644
index 00000000000..fc3b8d01f5b
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/abortable_stream.cpp
@@ -0,0 +1,34 @@
+#include "abortable_stream.h"
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+
+#include <util/system/yassert.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void IAbortableInputStream::Abort()
+{
+ Y_ABORT("Unimplemented");
+}
+
+bool IAbortableInputStream::IsAborted() const
+{
+ return false;
+}
+
+bool IAbortableInputStream::IsAbortedError(const std::exception_ptr& error)
+{
+ try {
+ std::rethrow_exception(error);
+ } catch (const TInputStreamAbortedError& ex) {
+ return true;
+ } catch (...) {
+ return false;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/abortable_stream.h b/yt/cpp/mapreduce/interface/abortable_stream.h
new file mode 100644
index 00000000000..40316e1b790
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/abortable_stream.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <util/stream/input.h>
+
+#include <exception>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Input stream that supports aborting the read operation.
+///
+/// Extends @ref IInputStream with the ability to immediately cancel current or future reads.
+class IAbortableInputStream
+ : public IInputStream
+{
+public:
+ ~IAbortableInputStream() override = default;
+
+ /// @brief Immediately abort the stream, cancelling current and future reads.
+ ///
+ /// Some clients have already implemented this interface, so using pure virtual method leads to build errors.
+ virtual void Abort();
+
+ /// @brief Check whether the stream has been aborted.
+ virtual bool IsAborted() const;
+
+ static bool IsAbortedError(const std::exception_ptr& error);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp
index 9379ab57473..b4edcee4370 100644
--- a/yt/cpp/mapreduce/interface/config.cpp
+++ b/yt/cpp/mapreduce/interface/config.cpp
@@ -457,6 +457,8 @@ void Serialize(const TConfig& config, NYson::IYsonConsumer* consumer)
.Item("api_file_path_options").Value(config.ApiFilePathOptions)
.Item("use_abortable_response").Value(config.UseAbortableResponse)
.Item("enable_debug_metrics").Value(config.EnableDebugMetrics)
+ .Item("use_halting_response").Value(config.UseHaltingResponse)
+ .Item("halting_response_bytes_limit").Value(config.HaltingResponseBytesLimit)
.Item("enable_local_mode_optimization").Value(config.EnableLocalModeOptimization)
.Item("write_stderr_successful_jobs").Value(config.WriteStderrSuccessfulJobs)
.Item("trace_http_requests_mode").Value(::ToString(config.TraceHttpRequestsMode))
@@ -533,6 +535,8 @@ void Deserialize(TConfig& config, const TNode& node)
DESERIALIZE_ITEM("api_file_path_options", config.ApiFilePathOptions);
DESERIALIZE_ITEM("use_abortable_response", config.UseAbortableResponse);
DESERIALIZE_ITEM("enable_debug_metrics", config.EnableDebugMetrics);
+ DESERIALIZE_ITEM("use_halting_response", config.UseHaltingResponse);
+ DESERIALIZE_ITEM("halting_response_bytes_limit", config.HaltingResponseBytesLimit);
DESERIALIZE_ITEM("enable_local_mode_optimization", config.EnableLocalModeOptimization);
DESERIALIZE_ITEM("write_stderr_successful_jobs", config.WriteStderrSuccessfulJobs);
DESERIALIZE_ITEM("trace_http_requests_mode", config.TraceHttpRequestsMode);
diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h
index 7bf3112c620..b0d77548956 100644
--- a/yt/cpp/mapreduce/interface/config.h
+++ b/yt/cpp/mapreduce/interface/config.h
@@ -239,6 +239,18 @@ struct TConfig
bool UseAbortableResponse = false;
bool EnableDebugMetrics = false;
+ /// @brief Simulate a response that halts (stops sending data) mid-stream.
+ ///
+ /// Testing options, should never be used in user programs.
+ /// When enabled, the HTTP response will be truncated after @ref HaltingResponseBytesLimit bytes.
+ bool UseHaltingResponse = false;
+
+ /// @brief Maximum number of bytes sent before the response is halted.
+ ///
+ /// Testing options, should never be used in user programs.
+ /// Only meaningful when @ref UseHaltingResponse is true.
+ i64 HaltingResponseBytesLimit = 64 * 1024;
+
//
// There is optimization used with local YT that enables to skip binary upload and use real binary path.
// When EnableLocalModeOptimization is set to false this optimization is completely disabled.
diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h
index 6f78ffe14f3..cb8a7b5af90 100644
--- a/yt/cpp/mapreduce/interface/errors.h
+++ b/yt/cpp/mapreduce/interface/errors.h
@@ -293,6 +293,13 @@ private:
TVector<TFailedJobInfo> FailedJobInfo_;
};
+///
+/// @brief Error that is thrown when trying to read from aborted reader.
+///
+class TInputStreamAbortedError
+ : public yexception
+{ };
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h
index 77078bb5eb5..ac1464f5b98 100644
--- a/yt/cpp/mapreduce/interface/io-inl.h
+++ b/yt/cpp/mapreduce/interface/io-inl.h
@@ -123,6 +123,8 @@ struct IReaderImplBase
virtual i64 GetTabletIndex() const;
virtual bool IsEndOfStream() const;
virtual bool IsRawReaderExhausted() const;
+ virtual void Abort();
+ virtual bool IsAborted() const;
};
struct INodeReaderImpl
@@ -223,6 +225,16 @@ public:
return Reader_->GetTabletIndex();
}
+ void Abort()
+ {
+ Reader_->Abort();
+ }
+
+ bool IsAborted() const
+ {
+ return Reader_->IsAborted();
+ }
+
protected:
template <typename TCacher, typename TCacheGetter>
const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const
diff --git a/yt/cpp/mapreduce/interface/io.cpp b/yt/cpp/mapreduce/interface/io.cpp
index bbd417893c4..d3b4621c235 100644
--- a/yt/cpp/mapreduce/interface/io.cpp
+++ b/yt/cpp/mapreduce/interface/io.cpp
@@ -28,6 +28,16 @@ bool IReaderImplBase::IsRawReaderExhausted() const
Y_ABORT("Unimplemented");
}
+void IReaderImplBase::Abort()
+{
+ Y_ABORT("Unimplemented");
+}
+
+bool IReaderImplBase::IsAborted() const
+{
+ return false;
+}
+
////////////////////////////////////////////////////////////////////////////////
namespace NDetail {
diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h
index f8faf09663a..c80da6b041c 100644
--- a/yt/cpp/mapreduce/interface/io.h
+++ b/yt/cpp/mapreduce/interface/io.h
@@ -8,6 +8,7 @@
#include "fwd.h"
+#include "abortable_stream.h"
#include "client_method_options.h"
#include "common.h"
#include "distributed_session.h"
@@ -107,7 +108,7 @@ class TIOException
/// Interface representing YT file reader.
class IFileReader
: public TThrRefBase
- , public IInputStream
+ , public IAbortableInputStream
{ };
/// Interface representing YT file writer.
@@ -135,7 +136,7 @@ public:
/// Low-level interface to read YT table with retries.
class TRawTableReader
: public TThrRefBase
- , public IInputStream
+ , public IAbortableInputStream
{
public:
/// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`.
@@ -279,6 +280,12 @@ public:
/// Returns `true` if job raw input stream was closed and `false` otherwise.
bool IsRawReaderExhausted() const;
+
+ /// @brief Abort the reader, interrupting any in-progress or future reads.
+ void Abort();
+
+ /// @brief Check whether the reader is aborted.
+ bool IsAborted() const;
};
/// @brief Iterator for use in range-based-for.
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index e318c022ce5..9347a95e6e6 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -4,6 +4,8 @@
#include "client_method_options.h"
#include "operation.h"
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
#include <yt/cpp/mapreduce/http/context.h>
namespace NYT {
@@ -213,7 +215,7 @@ public:
// Files
- virtual std::unique_ptr<IInputStream> ReadFile(
+ virtual std::unique_ptr<IAbortableInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options = {}) = 0;
@@ -299,18 +301,18 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadTable(
+ virtual std::unique_ptr<IAbortableInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
const TTableReaderOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadTablePartition(
+ virtual std::unique_ptr<IAbortableInputStream> ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options = {}) = 0;
- virtual std::unique_ptr<IInputStream> ReadBlobTable(
+ virtual std::unique_ptr<IAbortableInputStream> ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make
index caa68b20f69..5e56861fa86 100644
--- a/yt/cpp/mapreduce/interface/ya.make
+++ b/yt/cpp/mapreduce/interface/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
+ abortable_stream.cpp
batch_request.cpp
client.cpp
client_method_options.cpp