diff options
Diffstat (limited to 'yt/cpp/mapreduce/interface')
| -rw-r--r-- | yt/cpp/mapreduce/interface/abortable_stream.cpp | 34 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/abortable_stream.h | 33 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/config.cpp | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/config.h | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/errors.h | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/io-inl.h | 12 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/io.cpp | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/io.h | 11 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/ya.make | 1 |
10 files changed, 128 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/interface/abortable_stream.cpp b/yt/cpp/mapreduce/interface/abortable_stream.cpp new file mode 100644 index 00000000000..fc3b8d01f5b --- /dev/null +++ b/yt/cpp/mapreduce/interface/abortable_stream.cpp @@ -0,0 +1,34 @@ +#include "abortable_stream.h" + +#include <yt/cpp/mapreduce/interface/errors.h> + +#include <util/system/yassert.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +void IAbortableInputStream::Abort() +{ + Y_ABORT("Unimplemented"); +} + +bool IAbortableInputStream::IsAborted() const +{ + return false; +} + +bool IAbortableInputStream::IsAbortedError(const std::exception_ptr& error) +{ + try { + std::rethrow_exception(error); + } catch (const TInputStreamAbortedError& ex) { + return true; + } catch (...) { + return false; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/abortable_stream.h b/yt/cpp/mapreduce/interface/abortable_stream.h new file mode 100644 index 00000000000..40316e1b790 --- /dev/null +++ b/yt/cpp/mapreduce/interface/abortable_stream.h @@ -0,0 +1,33 @@ +#pragma once + +#include <util/stream/input.h> + +#include <exception> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Input stream that supports aborting the read operation. +/// +/// Extends @ref IInputStream with the ability to immediately cancel current or future reads. +class IAbortableInputStream + : public IInputStream +{ +public: + ~IAbortableInputStream() override = default; + + /// @brief Immediately abort the stream, cancelling current and future reads. + /// + /// Some clients have already implemented this interface, so using pure virtual method leads to build errors. + virtual void Abort(); + + /// @brief Check whether the stream has been aborted. + virtual bool IsAborted() const; + + static bool IsAbortedError(const std::exception_ptr& error); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp index 9379ab57473..b4edcee4370 100644 --- a/yt/cpp/mapreduce/interface/config.cpp +++ b/yt/cpp/mapreduce/interface/config.cpp @@ -457,6 +457,8 @@ void Serialize(const TConfig& config, NYson::IYsonConsumer* consumer) .Item("api_file_path_options").Value(config.ApiFilePathOptions) .Item("use_abortable_response").Value(config.UseAbortableResponse) .Item("enable_debug_metrics").Value(config.EnableDebugMetrics) + .Item("use_halting_response").Value(config.UseHaltingResponse) + .Item("halting_response_bytes_limit").Value(config.HaltingResponseBytesLimit) .Item("enable_local_mode_optimization").Value(config.EnableLocalModeOptimization) .Item("write_stderr_successful_jobs").Value(config.WriteStderrSuccessfulJobs) .Item("trace_http_requests_mode").Value(::ToString(config.TraceHttpRequestsMode)) @@ -533,6 +535,8 @@ void Deserialize(TConfig& config, const TNode& node) DESERIALIZE_ITEM("api_file_path_options", config.ApiFilePathOptions); DESERIALIZE_ITEM("use_abortable_response", config.UseAbortableResponse); DESERIALIZE_ITEM("enable_debug_metrics", config.EnableDebugMetrics); + DESERIALIZE_ITEM("use_halting_response", config.UseHaltingResponse); + DESERIALIZE_ITEM("halting_response_bytes_limit", config.HaltingResponseBytesLimit); DESERIALIZE_ITEM("enable_local_mode_optimization", config.EnableLocalModeOptimization); DESERIALIZE_ITEM("write_stderr_successful_jobs", config.WriteStderrSuccessfulJobs); DESERIALIZE_ITEM("trace_http_requests_mode", config.TraceHttpRequestsMode); diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index 7bf3112c620..b0d77548956 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -239,6 +239,18 @@ struct TConfig bool UseAbortableResponse = false; bool EnableDebugMetrics = false; + /// @brief Simulate a response that halts (stops sending data) mid-stream. + /// + /// Testing options, should never be used in user programs. + /// When enabled, the HTTP response will be truncated after @ref HaltingResponseBytesLimit bytes. + bool UseHaltingResponse = false; + + /// @brief Maximum number of bytes sent before the response is halted. + /// + /// Testing options, should never be used in user programs. + /// Only meaningful when @ref UseHaltingResponse is true. + i64 HaltingResponseBytesLimit = 64 * 1024; + // // There is optimization used with local YT that enables to skip binary upload and use real binary path. // When EnableLocalModeOptimization is set to false this optimization is completely disabled. diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h index 6f78ffe14f3..cb8a7b5af90 100644 --- a/yt/cpp/mapreduce/interface/errors.h +++ b/yt/cpp/mapreduce/interface/errors.h @@ -293,6 +293,13 @@ private: TVector<TFailedJobInfo> FailedJobInfo_; }; +/// +/// @brief Error that is thrown when trying to read from aborted reader. +/// +class TInputStreamAbortedError + : public yexception +{ }; + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index 77078bb5eb5..ac1464f5b98 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -123,6 +123,8 @@ struct IReaderImplBase virtual i64 GetTabletIndex() const; virtual bool IsEndOfStream() const; virtual bool IsRawReaderExhausted() const; + virtual void Abort(); + virtual bool IsAborted() const; }; struct INodeReaderImpl @@ -223,6 +225,16 @@ public: return Reader_->GetTabletIndex(); } + void Abort() + { + Reader_->Abort(); + } + + bool IsAborted() const + { + return Reader_->IsAborted(); + } + protected: template <typename TCacher, typename TCacheGetter> const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const diff --git a/yt/cpp/mapreduce/interface/io.cpp b/yt/cpp/mapreduce/interface/io.cpp index bbd417893c4..d3b4621c235 100644 --- a/yt/cpp/mapreduce/interface/io.cpp +++ b/yt/cpp/mapreduce/interface/io.cpp @@ -28,6 +28,16 @@ bool IReaderImplBase::IsRawReaderExhausted() const Y_ABORT("Unimplemented"); } +void IReaderImplBase::Abort() +{ + Y_ABORT("Unimplemented"); +} + +bool IReaderImplBase::IsAborted() const +{ + return false; +} + //////////////////////////////////////////////////////////////////////////////// namespace NDetail { diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h index f8faf09663a..c80da6b041c 100644 --- a/yt/cpp/mapreduce/interface/io.h +++ b/yt/cpp/mapreduce/interface/io.h @@ -8,6 +8,7 @@ #include "fwd.h" +#include "abortable_stream.h" #include "client_method_options.h" #include "common.h" #include "distributed_session.h" @@ -107,7 +108,7 @@ class TIOException /// Interface representing YT file reader. class IFileReader : public TThrRefBase - , public IInputStream + , public IAbortableInputStream { }; /// Interface representing YT file writer. @@ -135,7 +136,7 @@ public: /// Low-level interface to read YT table with retries. class TRawTableReader : public TThrRefBase - , public IInputStream + , public IAbortableInputStream { public: /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`. @@ -279,6 +280,12 @@ public: /// Returns `true` if job raw input stream was closed and `false` otherwise. bool IsRawReaderExhausted() const; + + /// @brief Abort the reader, interrupting any in-progress or future reads. + void Abort(); + + /// @brief Check whether the reader is aborted. + bool IsAborted() const; }; /// @brief Iterator for use in range-based-for. diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index e318c022ce5..9347a95e6e6 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -4,6 +4,8 @@ #include "client_method_options.h" #include "operation.h" +#include <yt/cpp/mapreduce/interface/abortable_stream.h> + #include <yt/cpp/mapreduce/http/context.h> namespace NYT { @@ -213,7 +215,7 @@ public: // Files - virtual std::unique_ptr<IInputStream> ReadFile( + virtual std::unique_ptr<IAbortableInputStream> ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options = {}) = 0; @@ -299,18 +301,18 @@ public: const TMaybe<TFormat>& format, const TTableWriterOptions& options = {}) = 0; - virtual std::unique_ptr<IInputStream> ReadTable( + virtual std::unique_ptr<IAbortableInputStream> ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = {}) = 0; - virtual std::unique_ptr<IInputStream> ReadTablePartition( + virtual std::unique_ptr<IAbortableInputStream> ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options = {}) = 0; - virtual std::unique_ptr<IInputStream> ReadBlobTable( + virtual std::unique_ptr<IAbortableInputStream> ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make index caa68b20f69..5e56861fa86 100644 --- a/yt/cpp/mapreduce/interface/ya.make +++ b/yt/cpp/mapreduce/interface/ya.make @@ -3,6 +3,7 @@ LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( + abortable_stream.cpp batch_request.cpp client.cpp client_method_options.cpp |
