diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/rpc_client/raw_client.cpp | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/rpc_client/raw_client.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/rpc_client/raw_client.cpp | 65 |
1 files changed, 44 insertions, 21 deletions
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp index 530fd58cb02..f5643b1f6a5 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp +++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp @@ -5,8 +5,12 @@ #include "rpc_parameters_serialization.h" #include "wrap_rpc_error.h" +#include <yt/cpp/mapreduce/common/abortable_stream.h> +#include <yt/cpp/mapreduce/common/halting_stream.h> #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/interface/abortable_stream.h> +#include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/client/api/file_reader.h> @@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn) //////////////////////////////////////////////////////////////////////////////// class TSyncRpcInputStream - : public IInputStream + : public IAbortableInputStream { public: - explicit TSyncRpcInputStream(std::unique_ptr<IInputStream> stream) + explicit TSyncRpcInputStream(std::unique_ptr<IAbortableInputStream> stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: - const std::unique_ptr<IInputStream> Underlying_; + const std::unique_ptr<IAbortableInputStream> Underlying_; size_t DoRead(void* buf, size_t len) override { @@ -861,10 +875,15 @@ class TRpcResponseStream : public IFileReader { public: - TRpcResponseStream(std::unique_ptr<IInputStream> stream) + TRpcResponseStream(std::unique_ptr<IAbortableInputStream> stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -877,11 +896,11 @@ private: } private: - std::unique_ptr<IInputStream> Underlying_; + std::unique_ptr<IAbortableInputStream> Underlying_; }; class TFixedStringStream - : public IInputStream + : public IAbortableInputStream { public: TFixedStringStream(TSharedRef data) @@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput( { auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - auto stream = std::make_unique<TSyncRpcInputStream>(CreateSyncAdapter(CreateCopyingAdapter(result))); + auto stream = std::make_unique<TSyncRpcInputStream>(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result))); + std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result.Data))); + std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result.Data))); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } @@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace( NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)), SerializeOptionsForGetJobTrace(options)); auto result = WaitAndProcess(future); - auto stream = CreateSyncAdapter(CreateCopyingAdapter(result)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)); return MakeIntrusive<TRpcResponseStream>(std::move(stream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadFile( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) { auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options)); auto reader = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader)); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } class TRpcWriteFileRequestStream @@ -1245,7 +1264,7 @@ std::unique_ptr<IOutputStream> TRpcRawClient::WriteTable( return std::make_unique<TSyncRpcOutputStream>(std::move(rowStream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadTable( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, @@ -1259,11 +1278,15 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTable( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto asyncStream = CreateCopyingAdapter(std::move(formatStream)); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } + auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) @@ -1276,11 +1299,11 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); + auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream))); return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); } -std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( +std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, @@ -1331,8 +1354,8 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable( options.StartPartIndex_, options.Offset_, options.PartSize_); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader)); - return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader)); + return std::make_unique<TSyncRpcInputStream>(std::move(stream)); } void TRpcRawClient::AlterTableReplica( |
