summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/rpc_client/raw_client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/rpc_client/raw_client.cpp')
-rw-r--r--yt/cpp/mapreduce/rpc_client/raw_client.cpp65
1 files changed, 44 insertions, 21 deletions
diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
index 530fd58cb02..f5643b1f6a5 100644
--- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp
@@ -5,8 +5,12 @@
#include "rpc_parameters_serialization.h"
#include "wrap_rpc_error.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/client/api/file_reader.h>
@@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn)
////////////////////////////////////////////////////////////////////////////////
class TSyncRpcInputStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- explicit TSyncRpcInputStream(std::unique_ptr<IInputStream> stream)
+ explicit TSyncRpcInputStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
private:
- const std::unique_ptr<IInputStream> Underlying_;
+ const std::unique_ptr<IAbortableInputStream> Underlying_;
size_t DoRead(void* buf, size_t len) override
{
@@ -861,10 +875,15 @@ class TRpcResponseStream
: public IFileReader
{
public:
- TRpcResponseStream(std::unique_ptr<IInputStream> stream)
+ TRpcResponseStream(std::unique_ptr<IAbortableInputStream> stream)
: Underlying_(std::move(stream))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
private:
size_t DoRead(void *buf, size_t len) override
{
@@ -877,11 +896,11 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
};
class TFixedStringStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
TFixedStringStream(TSharedRef data)
@@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput(
{
auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- auto stream = std::make_unique<TSyncRpcInputStream>(CreateSyncAdapter(CreateCopyingAdapter(result)));
+ auto stream = std::make_unique<TSyncRpcInputStream>(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr(
NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)),
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)));
auto result = WaitAndProcess(future);
- std::unique_ptr<IInputStream> stream(new TFixedStringStream(std::move(result.Data)));
+ std::unique_ptr<IAbortableInputStream> stream(new TFixedStringStream(std::move(result.Data)));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
@@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace(
NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)),
SerializeOptionsForGetJobTrace(options));
auto result = WaitAndProcess(future);
- auto stream = CreateSyncAdapter(CreateCopyingAdapter(result));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result));
return MakeIntrusive<TRpcResponseStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadFile(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFileReaderOptions& options)
{
auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options));
auto reader = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
class TRpcWriteFileRequestStream
@@ -1245,7 +1264,7 @@ std::unique_ptr<IOutputStream> TRpcRawClient::WriteTable(
return std::make_unique<TSyncRpcOutputStream>(std::move(rowStream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TFormat& format,
@@ -1259,11 +1278,15 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTable(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto asyncStream = CreateCopyingAdapter(std::move(formatStream));
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
+ auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadTablePartition(
const TString& cookie,
const TFormat& format,
const TTablePartitionReaderOptions& options)
@@ -1276,11 +1299,11 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadTablePartition(
auto formatStream = WaitAndProcess(future);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream)));
+ auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream)));
return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
}
-std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
+std::unique_ptr<IAbortableInputStream> TRpcRawClient::ReadBlobTable(
const TTransactionId& transactionId,
const TRichYPath& path,
const TKey& key,
@@ -1331,8 +1354,8 @@ std::unique_ptr<IInputStream> TRpcRawClient::ReadBlobTable(
options.StartPartIndex_,
options.Offset_,
options.PartSize_);
- auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader));
- return std::make_unique<TSyncRpcInputStream>(std::move(syncAdapter));
+ auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader));
+ return std::make_unique<TSyncRpcInputStream>(std::move(stream));
}
void TRpcRawClient::AlterTableReplica(