From a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf Mon Sep 17 00:00:00 2001 From: maybenotilya Date: Fri, 17 Apr 2026 11:57:44 +0300 Subject: YT-26179: Add Abort for readers * Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b --- yt/cpp/mapreduce/rpc_client/raw_client.cpp | 65 ++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 21 deletions(-) (limited to 'yt/cpp/mapreduce/rpc_client/raw_client.cpp') diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp index 530fd58cb02..f5643b1f6a5 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp +++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp @@ -5,8 +5,12 @@ #include "rpc_parameters_serialization.h" #include "wrap_rpc_error.h" +#include +#include #include +#include +#include #include #include @@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn) //////////////////////////////////////////////////////////////////////////////// class TSyncRpcInputStream - : public IInputStream + : public IAbortableInputStream { public: - explicit TSyncRpcInputStream(std::unique_ptr stream) + explicit TSyncRpcInputStream(std::unique_ptr stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: - const std::unique_ptr Underlying_; + const std::unique_ptr Underlying_; size_t DoRead(void* buf, size_t len) override { @@ -861,10 +875,15 @@ class TRpcResponseStream : public IFileReader { public: - TRpcResponseStream(std::unique_ptr stream) + TRpcResponseStream(std::unique_ptr stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -877,11 +896,11 @@ private: } private: - std::unique_ptr Underlying_; + std::unique_ptr Underlying_; }; class TFixedStringStream - : public IInputStream + : public IAbortableInputStream { public: TFixedStringStream(TSharedRef data) @@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput( { auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - auto stream = std::make_unique(CreateSyncAdapter(CreateCopyingAdapter(result))); + auto stream = std::make_unique(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result))); return MakeIntrusive(std::move(stream)); } @@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr stream(new TFixedStringStream(std::move(result))); + std::unique_ptr stream(new TFixedStringStream(std::move(result))); return MakeIntrusive(std::move(stream)); } @@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr stream(new TFixedStringStream(std::move(result.Data))); + std::unique_ptr stream(new TFixedStringStream(std::move(result.Data))); return MakeIntrusive(std::move(stream)); } @@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace( NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)), SerializeOptionsForGetJobTrace(options)); auto result = WaitAndProcess(future); - auto stream = CreateSyncAdapter(CreateCopyingAdapter(result)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)); return MakeIntrusive(std::move(stream)); } -std::unique_ptr TRpcRawClient::ReadFile( +std::unique_ptr TRpcRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) { auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options)); auto reader = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader)); - return std::make_unique(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader)); + return std::make_unique(std::move(stream)); } class TRpcWriteFileRequestStream @@ -1245,7 +1264,7 @@ std::unique_ptr TRpcRawClient::WriteTable( return std::make_unique(std::move(rowStream)); } -std::unique_ptr TRpcRawClient::ReadTable( +std::unique_ptr TRpcRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, @@ -1259,11 +1278,15 @@ std::unique_ptr TRpcRawClient::ReadTable( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); - return std::make_unique(std::move(syncAdapter)); + auto asyncStream = CreateCopyingAdapter(std::move(formatStream)); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } + auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream)); + return std::make_unique(std::move(stream)); } -std::unique_ptr TRpcRawClient::ReadTablePartition( +std::unique_ptr TRpcRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) @@ -1276,11 +1299,11 @@ std::unique_ptr TRpcRawClient::ReadTablePartition( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); + auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream))); return std::make_unique(std::move(syncAdapter)); } -std::unique_ptr TRpcRawClient::ReadBlobTable( +std::unique_ptr TRpcRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, @@ -1331,8 +1354,8 @@ std::unique_ptr TRpcRawClient::ReadBlobTable( options.StartPartIndex_, options.Offset_, options.PartSize_); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader)); - return std::make_unique(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader)); + return std::make_unique(std::move(stream)); } void TRpcRawClient::AlterTableReplica( -- cgit v1.3