summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/rpc_client
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/rpc_client
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/rpc_client')
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.cpp65
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.h8
-rw-r--r--yt/cpp/mapreduce/rpc_client/ya.make1
3 files changed, 49 insertions, 25 deletions
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
index 530fd58cb02..f5643b1f6a5 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
@@ -5,8 +5,12 @@
#include "rpc_parameters_serialization.h"
#include "wrap_rpc_error.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/client/api/file_reader.h>
@@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn)
////////////////////////////////////////////////////////////////////////////////
class TSyncRpcInputStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- explicit TSyncRpcInputStream(std::unique_ptr<IInputStream> stream)
+ explicit TSyncRpcInputStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
private:
- const std::unique_ptr<IInputStream> Underlying_;
+ const std::unique_ptr<IAbortableInputStream> Underlying_;
size_t DoRead(void* buf, size_t len) override
{
@@ -861,10 +875,15 @@ class TRpcResponseStream
: public IFileReader
{
public:
- TRpcResponseStream(std::unique_ptr<IInputStream> stream)
+ TRpcResponseStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
private:
size_t DoRead(void *buf, size_t len) override
{
@@ -877,11 +896,11 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
};
class TFixedStringStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
TFixedStringStream(TSharedRef data)
@@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput(
{
auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- auto stream = std::make_unique<TSyncRpcInputStream>(CreateSyncAdapter(CreateCopyingAdapter(result)));
+ auto stream = std::make_unique<TSyncRpcInputStream>(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result.Data)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result.Data)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace(
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)),
SerializeOptionsForGetJobTrace(options));
auto result = WaitAndProcess(future);
- auto stream = CreateSyncAdapter(CreateCopyingAdapter(result));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadFile(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options)
{
auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options));
auto reader = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
class TRpcWriteFileRequestStream
@@ -1245,7 +1264,7 @@ std::unique_ptr<IOutputStream> TRpcRawClient::WriteTable(
return std::make_unique<TSyncRpcOutputStream>(std::move(rowStream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
@@ -1259,11 +1278,15 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto asyncStream = CreateCopyingAdapter(std::move(formatStream));
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
+ auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options)
@@ -1276,11 +1299,11 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
+ auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream)));
return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
@@ -1331,8 +1354,8 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
options.StartPartIndex_,
options.Offset_,
options.PartSize_);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
void TRpcRawClient::AlterTableReplica(
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.h b/yt/cpp/mapreduce/rpc_client/raw_client.h
index 07fbb19758c..0eea19df078 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.h
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.h
@@ -211,7 +211,7 @@ public:
// Files
- std::unique_ptr<IInputStream> ReadFile(
+ std::unique_ptr<IAbortableInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options = {}) override;
@@ -291,18 +291,18 @@ public:
const TMaybe<TFormat>& format,
const TTableWriterOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTable(
+ std::unique_ptr<IAbortableInputStream> ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
const TTableReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadTablePartition(
+ std::unique_ptr<IAbortableInputStream> ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options = {}) override;
- std::unique_ptr<IInputStream> ReadBlobTable(
+ std::unique_ptr<IAbortableInputStream> ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
diff --git a/yt/cpp/mapreduce/rpc_client/ya.make b/yt/cpp/mapreduce/rpc_client/ya.make
index 115dc550ff7..37fd90543a8 100644
--- a/yt/cpp/mapreduce/rpc_client/ya.make
+++ b/yt/cpp/mapreduce/rpc_client/ya.make
@@ -12,6 +12,7 @@ SRCS(
PEERDIR(
library/cpp/yson/node
+ yt/cpp/mapreduce/common
yt/cpp/mapreduce/interface
yt/yt/client
)