summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/io
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/io')
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.h3
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.cpp4
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.cpp20
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.h2
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.h3
-rw-r--r--yt/cpp/mapreduce/io/stream_raw_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/stream_table_reader.h17
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.cpp10
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.h2
15 files changed, 125 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
index c6213e86651..05f3b1cca6b 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
@@ -28,6 +28,16 @@ size_t TCountingRawTableReader::GetReadByteCount() const
return ReadByteCount_;
}
+void TCountingRawTableReader::Abort()
+{
+ Reader_->Abort();
+}
+
+bool TCountingRawTableReader::IsAborted() const
+{
+ return Reader_->IsAborted();
+}
+
size_t TCountingRawTableReader::DoRead(void* buf, size_t len)
{
auto readLen = Reader_->Read(buf, len);
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h
index c3b197d5844..fe78144d76a 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.h
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.h
@@ -22,6 +22,9 @@ public:
size_t GetReadByteCount() const;
+ void Abort() override;
+ bool IsAborted() const override;
+
protected:
size_t DoRead(void* buf, size_t len) override;
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
index d7a1c9754ee..676400b1df3 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
@@ -2,6 +2,7 @@
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <util/string/printf.h>
@@ -30,6 +31,9 @@ TLenvalTableReader::~TLenvalTableReader()
void TLenvalTableReader::CheckValidity() const
{
+ if (Input_.IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!IsValid()) {
ythrow yexception() << "Iterator is not valid";
}
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
index 558c42b30ee..01eaf5d9462 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -3,6 +3,7 @@
#include <yt/cpp/mapreduce/common/node_builder.h>
#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <library/cpp/yson/parser.h>
@@ -341,6 +342,16 @@ bool TNodeTableReader::IsRawReaderExhausted() const
return Finished_;
}
+void TNodeTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TNodeTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
void TNodeTableReader::PrepareParsing()
@@ -368,6 +379,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error
void TNodeTableReader::CheckValidity() const
{
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!Valid_) {
ythrow yexception() << "Iterator is not valid";
}
diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h
index c8e319ce4f6..92af5b111b2 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.h
+++ b/yt/cpp/mapreduce/io/node_table_reader.h
@@ -11,8 +11,6 @@
#include <util/system/event.h>
#include <util/system/thread.h>
-#include <atomic>
-
namespace NYT {
class TRawTableReader;
@@ -54,6 +52,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
void NextImpl();
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp
index 6f79619c810..76e0243b395 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp
@@ -206,6 +206,16 @@ bool TProtoTableReader::IsRawReaderExhausted() const
return NodeReader_->IsRawReaderExhausted();
}
+void TProtoTableReader::Abort()
+{
+ NodeReader_->Abort();
+}
+
+bool TProtoTableReader::IsAborted() const
+{
+ return NodeReader_->IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
TLenvalProtoTableReader::TLenvalProtoTableReader(
@@ -291,6 +301,16 @@ bool TLenvalProtoTableReader::IsRawReaderExhausted() const
return TLenvalTableReader::IsRawReaderExhausted();
}
+void TLenvalProtoTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TLenvalProtoTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
void TLenvalProtoTableReader::SkipRow()
{
while (true) {
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h
index bfe4ac56474..fdb1f2fd5f5 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.h
+++ b/yt/cpp/mapreduce/io/proto_table_reader.h
@@ -31,6 +31,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
std::unique_ptr<TNodeTableReader> NodeReader_;
@@ -64,6 +66,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
protected:
void SkipRow() override;
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
index f77f2d71307..6de1d222d53 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
@@ -1,5 +1,6 @@
#include "skiff_row_table_reader.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/interface/skiff_row.h>
@@ -100,6 +101,9 @@ void TSkiffRowTableReader::SkipRow()
}
void TSkiffRowTableReader::CheckValidity() const {
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
if (!IsValid()) {
ythrow yexception() << "Iterator is not valid";
}
@@ -230,6 +234,16 @@ bool TSkiffRowTableReader::IsRawReaderExhausted() const {
return Finished_;
}
+void TSkiffRowTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TSkiffRowTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
index 1f623570bb9..c8086b48b83 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
@@ -37,6 +37,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
bool Retry(const std::exception_ptr& error);
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
index c96adade41e..94662741b45 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -1,5 +1,6 @@
#include "skiff_table_reader.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <library/cpp/yson/node/node_io.h>
@@ -155,6 +156,16 @@ bool TSkiffTableReader::IsRawReaderExhausted() const
return Finished_;
}
+void TSkiffTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TSkiffTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
@@ -288,6 +299,9 @@ void TSkiffTableReader::ReadRow()
void TSkiffTableReader::EnsureValidity() const
{
+ if (IsAborted()) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
Y_ENSURE(Valid_, "Iterator is not valid");
}
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h
index c7614776f41..ee01332beaf 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.h
@@ -35,6 +35,9 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
+
private:
struct TSkiffTableSchema;
diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
index ec19b67d0b0..f69eb9f034f 100644
--- a/yt/cpp/mapreduce/io/stream_raw_reader.cpp
+++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
@@ -5,6 +5,10 @@
#include "skiff_table_reader.h"
#include "yamr_table_reader.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
+
+#include <yt/yt/core/concurrency/async_stream_helpers.h>
+
#include <util/system/env.h>
#include <util/string/type.h>
@@ -33,6 +37,12 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
+TInputStreamProxy::TInputStreamProxy(IInputStream* stream)
+ : Stream_(CreateAbortableInputStreamAdapter(NConcurrency::CreateAsyncAdapter(stream)))
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
IInputStream* stream,
const TTableReaderOptions& /* options */,
diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h
index a7694a0a709..826d0fc269a 100644
--- a/yt/cpp/mapreduce/io/stream_table_reader.h
+++ b/yt/cpp/mapreduce/io/stream_table_reader.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
namespace NYT {
@@ -11,9 +12,7 @@ class TInputStreamProxy
: public TRawTableReader
{
public:
- TInputStreamProxy(IInputStream* stream)
- : Stream_(stream)
- { }
+ explicit TInputStreamProxy(IInputStream* stream);
bool Retry(
const TMaybe<ui32>& /*rangeIndex*/,
@@ -31,6 +30,16 @@ public:
return false;
}
+ void Abort() override
+ {
+ Stream_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Stream_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -38,7 +47,7 @@ protected:
}
private:
- IInputStream* Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
index 3ae6b8a8aeb..e9ebab520cc 100644
--- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
@@ -78,6 +78,16 @@ bool TYaMRTableReader::IsRawReaderExhausted() const
return TLenvalTableReader::IsRawReaderExhausted();
}
+void TYaMRTableReader::Abort()
+{
+ Input_.Abort();
+}
+
+bool TYaMRTableReader::IsAborted() const
+{
+ return Input_.IsAborted();
+}
+
void TYaMRTableReader::ReadField(TString* result, i32 length)
{
result->resize(length);
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h
index 39fdecfa71a..a2a6b97ebcb 100644
--- a/yt/cpp/mapreduce/io/yamr_table_reader.h
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.h
@@ -30,6 +30,8 @@ public:
TMaybe<size_t> GetReadByteCount() const override;
bool IsEndOfStream() const override;
bool IsRawReaderExhausted() const override;
+ void Abort() override;
+ bool IsAborted() const override;
private:
void ReadField(TString* result, i32 length);