diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/rpc_client | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/rpc_client')
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/raw_client.cpp | 65 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/raw_client.h | 8 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/ya.make | 1 |
3 files changed, 49 insertions, 25 deletions
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp index 530fd58cb02..f5643b1f6a5 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp +++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp @@ -5,8 +5,12 @@ #include "rpc_parameters_serialization.h" #include "wrap_rpc_error.h" +#include <yt/cpp/mapreduce/common/abortable_stream.h> +#include <yt/cpp/mapreduce/common/halting_stream.h> #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/interface/abortable_stream.h> +#include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/client/api/file_reader.h> @@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn) //////////////////////////////////////////////////////////////////////////////// class TSyncRpcInputStream - : public IInputStream + : public IAbortableInputStream { public: - explicit TSyncRpcInputStream(std::unique_ptr<IInputStream> stream) + explicit TSyncRpcInputStream(std::unique_ptr<IAbortableInputStream> stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: - const std::unique_ptr<IInputStream> Underlying_; + const std::unique_ptr<IAbortableInputStream> Underlying_; size_t DoRead(void* buf, size_t len) override { @@ -861,10 +875,15 @@ class TRpcResponseStream : public IFileReader { public: - TRpcResponseStream(std::unique_ptr<IInputStream> stream) + TRpcResponseStream(std::unique_ptr<IAbortableInputStream> stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -877,11 +896,11 @@ private: } private: - std::unique_ptr<IInputStream> Underlying_; + std::unique_ptr<IAbortableInputStream> Underlying_; }; class TFixedStringStream - : public IInputStream + : public IAbortableInputStream { public: TFixedStringStream(TSharedRef data) @@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput( { auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - auto stream = std::make_unique<TSyncRpcInputStream>(CreateSyncAdapter(CreateCopyingAdapter(result))); + auto stream = std::make_unique<TSyncRpcInputStream>(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result))); + std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result.Data))); + std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result.Data))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace( NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)), SerializeOptionsForGetJobTrace(options)); auto result = WaitAndProcess(future); - auto stream = CreateSyncAdapter(CreateCopyingAdapter(result)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadFile( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) { auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options)); auto reader = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader)); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } class TRpcWriteFileRequestStream @@ -1245,7 +1264,7 @@ std::unique_ptr<IOutputStream> TRpcRawClient::WriteTable( return std::make_unique<TSyncRpcOutputStream>(std::move(rowStream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadTable( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, @@ -1259,11 +1278,15 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTable( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto asyncStream = CreateCopyingAdapter(std::move(formatStream)); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } + auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) @@ -1276,11 +1299,11 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); + auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream))); return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, @@ -1331,8 +1354,8 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( options.StartPartIndex_, options.Offset_, options.PartSize_); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader)); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } void TRpcRawClient::AlterTableReplica( diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.h b/yt/cpp/mapreduce/rpc_client/raw_client.h index 07fbb19758c..0eea19df078 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.h +++ b/yt/cpp/mapreduce/rpc_client/raw_client.h @@ -211,7 +211,7 @@ public: // Files - std::unique_ptr<IInputStream> ReadFile( + std::unique_ptr<IAbortableInputStream> ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options = {}) override; @@ -291,18 +291,18 @@ public: const TMaybe<TFormat>& format, const TTableWriterOptions& options = {}) override; - std::unique_ptr<IInputStream> ReadTable( + std::unique_ptr<IAbortableInputStream> ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = {}) override; - std::unique_ptr<IInputStream> ReadTablePartition( + std::unique_ptr<IAbortableInputStream> ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options = {}) override; - std::unique_ptr<IInputStream> ReadBlobTable( + std::unique_ptr<IAbortableInputStream> ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/rpc_client/ya.make b/yt/cpp/mapreduce/rpc_client/ya.make index 115dc550ff7..37fd90543a8 100644 --- a/yt/cpp/mapreduce/rpc_client/ya.make +++ b/yt/cpp/mapreduce/rpc_client/ya.make @@ -12,6 +12,7 @@ SRCS( PEERDIR( library/cpp/yson/node + yt/cpp/mapreduce/common yt/cpp/mapreduce/interface yt/yt/client ) |
