diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/io | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/io')
| -rw-r--r-- | yt/cpp/mapreduce/io/counting_raw_reader.cpp | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/counting_raw_reader.h | 3 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.cpp | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.cpp | 14 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.h | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/proto_table_reader.cpp | 20 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/proto_table_reader.h | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.cpp | 14 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.h | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.cpp | 14 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.h | 3 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/stream_raw_reader.cpp | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/stream_table_reader.h | 17 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_reader.cpp | 10 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_reader.h | 2 |
15 files changed, 125 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp index c6213e86651..05f3b1cca6b 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp +++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp @@ -28,6 +28,16 @@ size_t TCountingRawTableReader::GetReadByteCount() const return ReadByteCount_; } +void TCountingRawTableReader::Abort() +{ + Reader_->Abort(); +} + +bool TCountingRawTableReader::IsAborted() const +{ + return Reader_->IsAborted(); +} + size_t TCountingRawTableReader::DoRead(void* buf, size_t len) { auto readLen = Reader_->Read(buf, len); diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h index c3b197d5844..fe78144d76a 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.h +++ b/yt/cpp/mapreduce/io/counting_raw_reader.h @@ -22,6 +22,9 @@ public: size_t GetReadByteCount() const; + void Abort() override; + bool IsAborted() const override; + protected: size_t DoRead(void* buf, size_t len) override; diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp index d7a1c9754ee..676400b1df3 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -2,6 +2,7 @@ #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <util/string/printf.h> @@ -30,6 +31,9 @@ TLenvalTableReader::~TLenvalTableReader() void TLenvalTableReader::CheckValidity() const { + if (Input_.IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!IsValid()) { ythrow yexception() << "Iterator is not valid"; } diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index 558c42b30ee..01eaf5d9462 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -3,6 +3,7 @@ #include <yt/cpp/mapreduce/common/node_builder.h> #include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <library/cpp/yson/parser.h> @@ -341,6 +342,16 @@ bool TNodeTableReader::IsRawReaderExhausted() const return Finished_; } +void TNodeTableReader::Abort() +{ + Input_.Abort(); +} + +bool TNodeTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// void TNodeTableReader::PrepareParsing() @@ -368,6 +379,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error void TNodeTableReader::CheckValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!Valid_) { ythrow yexception() << "Iterator is not valid"; } diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h index c8e319ce4f6..92af5b111b2 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.h +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -11,8 +11,6 @@ #include <util/system/event.h> #include <util/system/thread.h> -#include <atomic> - namespace NYT { class TRawTableReader; @@ -54,6 +52,8 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: void NextImpl(); diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp index 6f79619c810..76e0243b395 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.cpp +++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp @@ -206,6 +206,16 @@ bool TProtoTableReader::IsRawReaderExhausted() const return NodeReader_->IsRawReaderExhausted(); } +void TProtoTableReader::Abort() +{ + NodeReader_->Abort(); +} + +bool TProtoTableReader::IsAborted() const +{ + return NodeReader_->IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// TLenvalProtoTableReader::TLenvalProtoTableReader( @@ -291,6 +301,16 @@ bool TLenvalProtoTableReader::IsRawReaderExhausted() const return TLenvalTableReader::IsRawReaderExhausted(); } +void TLenvalProtoTableReader::Abort() +{ + Input_.Abort(); +} + +bool TLenvalProtoTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + void TLenvalProtoTableReader::SkipRow() { while (true) { diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h index bfe4ac56474..fdb1f2fd5f5 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.h +++ b/yt/cpp/mapreduce/io/proto_table_reader.h @@ -31,6 +31,8 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: std::unique_ptr<TNodeTableReader> NodeReader_; @@ -64,6 +66,8 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; protected: void SkipRow() override; diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp index f77f2d71307..6de1d222d53 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp @@ -1,5 +1,6 @@ #include "skiff_row_table_reader.h" +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/cpp/mapreduce/interface/skiff_row.h> @@ -100,6 +101,9 @@ void TSkiffRowTableReader::SkipRow() } void TSkiffRowTableReader::CheckValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!IsValid()) { ythrow yexception() << "Iterator is not valid"; } @@ -230,6 +234,16 @@ bool TSkiffRowTableReader::IsRawReaderExhausted() const { return Finished_; } +void TSkiffRowTableReader::Abort() +{ + Input_.Abort(); +} + +bool TSkiffRowTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h index 1f623570bb9..c8086b48b83 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h @@ -37,6 +37,8 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: bool Retry(const std::exception_ptr& error); diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp index c96adade41e..94662741b45 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -1,5 +1,6 @@ #include "skiff_table_reader.h" +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <library/cpp/yson/node/node_io.h> @@ -155,6 +156,16 @@ bool TSkiffTableReader::IsRawReaderExhausted() const return Finished_; } +void TSkiffTableReader::Abort() +{ + Input_.Abort(); +} + +bool TSkiffTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas( @@ -288,6 +299,9 @@ void TSkiffTableReader::ReadRow() void TSkiffTableReader::EnsureValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } Y_ENSURE(Valid_, "Iterator is not valid"); } diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h index c7614776f41..ee01332beaf 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_table_reader.h @@ -35,6 +35,9 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; + private: struct TSkiffTableSchema; diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp index ec19b67d0b0..f69eb9f034f 100644 --- a/yt/cpp/mapreduce/io/stream_raw_reader.cpp +++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp @@ -5,6 +5,10 @@ #include "skiff_table_reader.h" #include "yamr_table_reader.h" +#include <yt/cpp/mapreduce/common/abortable_stream.h> + +#include <yt/yt/core/concurrency/async_stream_helpers.h> + #include <util/system/env.h> #include <util/string/type.h> @@ -33,6 +37,12 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// +TInputStreamProxy::TInputStreamProxy(IInputStream* stream) + : Stream_(CreateAbortableInputStreamAdapter(NConcurrency::CreateAsyncAdapter(stream))) +{ } + +//////////////////////////////////////////////////////////////////////////////// + ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( IInputStream* stream, const TTableReaderOptions& /* options */, diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h index a7694a0a709..826d0fc269a 100644 --- a/yt/cpp/mapreduce/io/stream_table_reader.h +++ b/yt/cpp/mapreduce/io/stream_table_reader.h @@ -1,5 +1,6 @@ #pragma once +#include <yt/cpp/mapreduce/interface/abortable_stream.h> #include <yt/cpp/mapreduce/interface/io.h> namespace NYT { @@ -11,9 +12,7 @@ class TInputStreamProxy : public TRawTableReader { public: - TInputStreamProxy(IInputStream* stream) - : Stream_(stream) - { } + explicit TInputStreamProxy(IInputStream* stream); bool Retry( const TMaybe<ui32>& /*rangeIndex*/, @@ -31,6 +30,16 @@ public: return false; } + void Abort() override + { + Stream_->Abort(); + } + + bool IsAborted() const override + { + return Stream_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -38,7 +47,7 @@ protected: } private: - IInputStream* Stream_; + std::unique_ptr<IAbortableInputStream> Stream_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp index 3ae6b8a8aeb..e9ebab520cc 100644 --- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp @@ -78,6 +78,16 @@ bool TYaMRTableReader::IsRawReaderExhausted() const return TLenvalTableReader::IsRawReaderExhausted(); } +void TYaMRTableReader::Abort() +{ + Input_.Abort(); +} + +bool TYaMRTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + void TYaMRTableReader::ReadField(TString* result, i32 length) { result->resize(length); diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h index 39fdecfa71a..a2a6b97ebcb 100644 --- a/yt/cpp/mapreduce/io/yamr_table_reader.h +++ b/yt/cpp/mapreduce/io/yamr_table_reader.h @@ -30,6 +30,8 @@ public: TMaybe<size_t> GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: void ReadField(TString* result, i32 length); |
